varta_watch/listener.rs
1//! Transport abstraction for receiving VLP frames.
2//!
3//! The [`BeatListener`] trait is the pluggable receive backend for [`Observer`].
4//! [`UdsListener`] provides the default Unix Domain Socket implementation;
5//! alternative transports (e.g. UDP) are available behind feature flags.
6//!
7//! [`Observer`]: crate::Observer
8
9use core::marker::PhantomData;
10use std::io::{self, ErrorKind};
11use std::os::unix::fs::PermissionsExt;
12use std::os::unix::io::AsRawFd;
13use std::os::unix::net::UnixDatagram;
14use std::path::{Path, PathBuf};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18/// Count of `fsync_parent_dir` failures since process start. Incremented on
19/// every bind where the parent-directory fsync succeeds at the OS level but
20/// returns an error (e.g. `EINVAL` on platforms that do not support directory
21/// fsync). Drained by [`drain_bind_dir_fsync_failures`].
22static DIR_FSYNC_FAILED: AtomicU64 = AtomicU64::new(0);
23
24/// Drain and reset the parent-directory fsync failure counter.
25///
26/// Returns the number of `fsync_parent_dir` calls that failed since the last
27/// drain (typically since process start, because bind runs once). Called by
28/// the observer poll loop to surface `varta_socket_bind_dir_fsync_failed_total`
29/// via the Prometheus exporter.
30pub fn drain_bind_dir_fsync_failures() -> u64 {
31 DIR_FSYNC_FAILED.swap(0, Ordering::Relaxed)
32}
33
34extern "C" {
35 fn umask(mode: u32) -> u32;
36}
37
38// POSIX setsockopt / getsockopt for SO_RCVBUF tuning.
39extern "C" {
40 fn setsockopt(
41 fd: i32,
42 level: i32,
43 optname: i32,
44 optval: *const core::ffi::c_void,
45 optlen: u32,
46 ) -> i32;
47 fn getsockopt(
48 fd: i32,
49 level: i32,
50 optname: i32,
51 optval: *mut core::ffi::c_void,
52 optlen: *mut u32,
53 ) -> i32;
54}
55
56// SOL_SOCKET / SO_RCVBUF — platform-scoped FFI constants for setsockopt(2).
57//
58// Sources verified against vendor headers:
59// linux: include/uapi/asm-generic/socket.h (SOL_SOCKET=1, SO_RCVBUF=8)
60// macOS: xnu/bsd/sys/socket.h (SOL_SOCKET=0xffff, SO_RCVBUF=0x1002)
61// *BSD: sys/socket.h (SOL_SOCKET=0xffff, SO_RCVBUF=0x1002)
62// illumos: usr/src/uts/common/sys/socket.h (SOL_SOCKET=0xffff, SO_RCVBUF=0x1002)
63//
64// Adding a new target_os requires verifying these two constants against the
65// platform's headers and extending the cfg-any lists below. The `compile_error!`
66// fallback prevents silent drift to wrong values on an untested platform.
67#[cfg(target_os = "linux")]
68const SOL_SOCKET: i32 = 1;
69#[cfg(any(
70 target_os = "macos",
71 target_os = "ios",
72 target_os = "freebsd",
73 target_os = "dragonfly",
74 target_os = "netbsd",
75 target_os = "openbsd",
76 target_os = "illumos",
77 target_os = "solaris",
78))]
79const SOL_SOCKET: i32 = 0xffff_u32 as i32;
80
81#[cfg(target_os = "linux")]
82const SO_RCVBUF: i32 = 8;
83#[cfg(any(
84 target_os = "macos",
85 target_os = "ios",
86 target_os = "freebsd",
87 target_os = "dragonfly",
88 target_os = "netbsd",
89 target_os = "openbsd",
90 target_os = "illumos",
91 target_os = "solaris",
92))]
93const SO_RCVBUF: i32 = 0x1002;
94
95#[cfg(not(any(
96 target_os = "linux",
97 target_os = "macos",
98 target_os = "ios",
99 target_os = "freebsd",
100 target_os = "dragonfly",
101 target_os = "netbsd",
102 target_os = "openbsd",
103 target_os = "illumos",
104 target_os = "solaris",
105)))]
106compile_error!(
107 "varta-watch has no verified SOL_SOCKET / SO_RCVBUF values for this target_os. \
108 Verify against the platform's <sys/socket.h> and extend the cfg-any lists in \
109 crates/varta-watch/src/listener.rs."
110);
111
112/// Attests that the process is single-threaded at construction time.
113///
114/// [`UdsListener::bind`] calls `umask(2)`, which is process-wide; any thread
115/// creating filesystem objects during the bind window would inherit the
116/// restricted umask. Holding a `&PreThreadAttestation` encodes the
117/// single-threaded precondition in the type signature so the invariant is
118/// enforced at compile time, not just by convention.
119///
120/// Construct exactly once at the top of `fn main`, before any thread spawn:
121///
122/// ```text
123/// let pre_thread = PreThreadAttestation::new()?;
124/// // … then pass &pre_thread to Observer::bind / UdsListener::bind
125/// ```
126///
127/// The token is `!Send + !Sync` (via `PhantomData<*const ()>`) so it cannot
128/// be moved into or shared across thread boundaries after construction.
129#[derive(Debug)]
130pub struct PreThreadAttestation {
131 _no_send: PhantomData<*const ()>,
132}
133
134impl PreThreadAttestation {
135 /// Probe the OS thread count and return a token if the process is
136 /// single-threaded.
137 ///
138 /// On Linux counts `/proc/self/task/` entries. On macOS calls
139 /// `pthread_is_threaded_np(3)`. On other platforms the runtime probe is
140 /// skipped; the type-level structural guarantee still holds.
141 ///
142 /// # Errors
143 ///
144 /// Returns [`io::ErrorKind::Other`] if the process has more than one
145 /// thread, or if the Linux `/proc/self/task` directory is unreadable.
146 pub fn new() -> io::Result<Self> {
147 Self::probe()?;
148 Ok(Self {
149 _no_send: PhantomData,
150 })
151 }
152
153 /// Create a token without a runtime probe.
154 ///
155 /// Intended for test code where the multi-threaded test runner would
156 /// incorrectly fail the probe even though the umask window is benign.
157 ///
158 /// # Safety
159 ///
160 /// The caller must ensure that no concurrent thread creates filesystem
161 /// objects during the `UdsListener::bind` window, or that any such race
162 /// is acceptable in the calling context.
163 pub unsafe fn new_unchecked() -> Self {
164 Self {
165 _no_send: PhantomData,
166 }
167 }
168
169 #[cfg(target_os = "linux")]
170 fn probe() -> io::Result<()> {
171 let mut count: usize = 0;
172 for entry in std::fs::read_dir("/proc/self/task")? {
173 entry?;
174 count += 1;
175 if count > 1 {
176 return Err(io::Error::new(
177 io::ErrorKind::Other,
178 "process is multi-threaded; UdsListener::bind changes \
179 umask(2) process-wide and would race concurrent file creation",
180 ));
181 }
182 }
183 Ok(())
184 }
185
186 #[cfg(target_os = "macos")]
187 fn probe() -> io::Result<()> {
188 extern "C" {
189 // Available in macOS pthread.h since 10.0.
190 // Returns 0 when single-threaded, 1 when multi-threaded.
191 fn pthread_is_threaded_np() -> i32;
192 }
193 // SAFETY: pthread_is_threaded_np is a pure read of a per-process flag
194 // with no side effects and a stable ABI across all macOS versions.
195 if unsafe { pthread_is_threaded_np() } != 0 {
196 return Err(io::Error::new(
197 io::ErrorKind::Other,
198 "process is multi-threaded; UdsListener::bind changes \
199 umask(2) process-wide and would race concurrent file creation",
200 ));
201 }
202 Ok(())
203 }
204
205 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
206 fn probe() -> io::Result<()> {
207 // No per-platform thread-count probe is implemented here.
208 // The type-level guarantee — UdsListener::bind requires a
209 // &PreThreadAttestation that can only be soundly constructed before
210 // the first thread spawn — remains the primary enforcement.
211 Ok(())
212 }
213}
214
215/// RAII guard that restores the process umask on drop, even if a panic
216/// unwinds through the bind path.
217struct UmaskGuard(u32);
218
219impl Drop for UmaskGuard {
220 fn drop(&mut self) {
221 unsafe {
222 umask(self.0);
223 }
224 }
225}
226
227use crate::peer_cred::{self, RecvResult};
228
229/// Per-listener operator trust declaration for recovery eligibility.
230///
231/// Passed to a listener's builder method to promote UDP-origin beats from
232/// [`BeatOrigin::NetworkUnverified`] to [`BeatOrigin::OperatorAttestedTransport`].
233/// Recovery commands will then fire for stalls on that listener, as they
234/// would for kernel-attested UDS beats.
235///
236/// This is a structural enforcement of per-listener trust — there is no
237/// daemon-wide way to grant recovery trust to a UDP listener.
238#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
239pub enum TransportTrust {
240 /// No operator declaration — beats from this listener are stamped
241 /// [`BeatOrigin::NetworkUnverified`]. Recovery is refused for stalls.
242 #[default]
243 Untrusted,
244 /// Operator has explicitly accepted the security risk for this listener.
245 /// Beats are stamped [`BeatOrigin::OperatorAttestedTransport`] so the
246 /// runtime recovery gate allows them to fire.
247 Operator,
248}
249
250/// Abstraction over a source that can receive 32-byte VLP frames.
251///
252/// Implementations must be `Send + 'static` so [`Observer`] can be moved.
253///
254/// [`Observer`]: crate::Observer
255pub trait BeatListener: Send + 'static {
256 /// Receive one datagram. Returns the same `RecvResult` as
257 /// `peer_cred::recv_authenticated` — callers see `Authenticated`,
258 /// `WouldBlock`, `ShortRead`, or `IoError`.
259 fn recv(&mut self) -> RecvResult;
260
261 /// Drain and reset the AEAD decryption failure counter.
262 ///
263 /// The default implementation returns 0 — only listeners that perform
264 /// authenticated decryption will override this.
265 fn drain_decrypt_failures(&mut self) -> u64 {
266 0
267 }
268
269 /// Drain and reset the truncated-datagram counter.
270 fn drain_truncated(&mut self) -> u64 {
271 0
272 }
273
274 /// Drain and reset the sender-state-full counter.
275 ///
276 /// Incremented when the per-sender replay map is at capacity and a
277 /// stale-sender sweep fails to free space, forcing eviction of the
278 /// oldest entry. Only listeners that maintain per-sender state will
279 /// override this.
280 fn drain_sender_state_full(&mut self) -> u64 {
281 0
282 }
283
284 /// Drain and reset the AEAD-decryption-attempt counter.
285 ///
286 /// Counted by listeners that trial every loaded key on every frame
287 /// without early-exit on success — the constant-trial-count poll that
288 /// closes the key-rotation timing side-channel. Only the secure-UDP
289 /// listener overrides this.
290 fn drain_aead_attempts(&mut self) -> u64 {
291 0
292 }
293}
294
295/// Unix Domain Socket listener for local IPC.
296///
297/// Created via [`UdsListener::bind`] and used as the default backend for
298/// [`Observer::bind`].
299///
300/// [`Observer::bind`]: crate::Observer::bind
301pub struct UdsListener {
302 sock: UnixDatagram,
303 path: PathBuf,
304 bound_dev: u64,
305 bound_ino: u64,
306 truncated_count: u64,
307 /// Effective `SO_RCVBUF` granted by the kernel (may be less than
308 /// requested due to `net.core.rmem_max`). `0` means no tuning was
309 /// attempted (`--uds-rcvbuf-bytes 0`).
310 rcvbuf_bytes: u32,
311}
312
313impl UdsListener {
314 /// Bind a Unix datagram socket at `path` and return a [`UdsListener`].
315 ///
316 /// The socket file permissions are set to `socket_mode` (octal, e.g.
317 /// `0o600`) after a successful bind. Credential passing is enabled on
318 /// the socket so that `recv` can verify the PID of every sender against
319 /// the kernel's `SO_PASSCRED` / `LOCAL_CREDS` attestation.
320 ///
321 /// If a genuine stale socket exists at `path` (a socket inode with no
322 /// listener), it is cleaned up and the bind succeeds. If another
323 /// process is already listening at `path`, or if the path is occupied by
324 /// a non-socket file, the call fails with `AddrInUse`.
325 ///
326 /// The socket is given a read timeout so `recv` cannot block
327 /// indefinitely.
328 pub fn bind(
329 path: impl AsRef<Path>,
330 socket_mode: u32,
331 read_timeout: Duration,
332 uds_rcvbuf_bytes: u32,
333 _pre_thread: &PreThreadAttestation,
334 ) -> io::Result<Self> {
335 let path = path.as_ref();
336 let owned_path: PathBuf = path.to_path_buf();
337
338 let restrict_umask = !socket_mode & 0o777;
339 let _umask_guard = UmaskGuard(unsafe { umask(restrict_umask) });
340 let bind_result = UnixDatagram::bind(path);
341 let sock = match bind_result {
342 Ok(sock) => sock,
343 Err(e) if e.kind() == ErrorKind::AddrInUse => {
344 let PathOccupant::Socket(stale_socket) = path_occupant(path)? else {
345 return Err(io::Error::new(
346 ErrorKind::AddrInUse,
347 format!(
348 "cannot bind observer socket at {}: path exists and is not a socket",
349 path.display()
350 ),
351 ));
352 };
353
354 match probe_live(path) {
355 Ok(true) => {
356 return Err(io::Error::new(
357 ErrorKind::AddrInUse,
358 format!(
359 "another varta-watch is already running at {}",
360 path.display()
361 ),
362 ));
363 }
364 Ok(false) => {
365 match path_occupant(path)? {
366 PathOccupant::Socket(current) if current == stale_socket => {
367 std::fs::remove_file(path)?;
368 }
369 PathOccupant::Missing => {}
370 PathOccupant::Socket(_) => {
371 return Err(io::Error::new(
372 ErrorKind::AddrInUse,
373 format!(
374 "observer socket path changed while probing {}; retry bind",
375 path.display()
376 ),
377 ));
378 }
379 PathOccupant::Other => {
380 return Err(io::Error::new(
381 ErrorKind::AddrInUse,
382 format!(
383 "cannot bind observer socket at {}: path exists and is not a socket",
384 path.display()
385 ),
386 ));
387 }
388 }
389 let _umask_guard = UmaskGuard(unsafe { umask(restrict_umask) });
390 let sock = UnixDatagram::bind(path)?;
391 std::fs::set_permissions(
392 path,
393 std::fs::Permissions::from_mode(socket_mode),
394 )?;
395 return Self::finish_bind(sock, owned_path, read_timeout, uds_rcvbuf_bytes);
396 }
397 Err(e) => {
398 return Err(io::Error::new(
399 e.kind(),
400 format!("cannot probe socket at {}: {e}", path.display()),
401 ));
402 }
403 }
404 }
405 Err(e) => return Err(e),
406 };
407
408 std::fs::set_permissions(path, std::fs::Permissions::from_mode(socket_mode))?;
409 Self::finish_bind(sock, owned_path, read_timeout, uds_rcvbuf_bytes)
410 }
411
412 fn finish_bind(
413 sock: UnixDatagram,
414 path: PathBuf,
415 read_timeout: Duration,
416 uds_rcvbuf_bytes: u32,
417 ) -> io::Result<Self> {
418 use std::os::unix::fs::MetadataExt;
419
420 sock.set_read_timeout(Some(read_timeout))?;
421 let raw_fd = sock.as_raw_fd();
422 peer_cred::enable_credential_passing(raw_fd)?;
423
424 let meta = std::fs::metadata(&path)?;
425 let bound_dev = meta.dev();
426 let bound_ino = meta.ino();
427
428 // Fsync the parent directory so the unlink+bind+chmod sequence is
429 // durable across power loss or an unclean shutdown. The bind has
430 // already succeeded — a directory-fsync failure is treated as a soft
431 // durability degradation rather than a startup failure (some exotic
432 // platforms return EINVAL for directory fsync).
433 if let Err(e) = fsync_parent_dir(&path) {
434 crate::varta_warn!(
435 "uds bind: parent-directory fsync failed (durability degraded): {e}"
436 );
437 DIR_FSYNC_FAILED.fetch_add(1, Ordering::Relaxed);
438 }
439
440 let granted_rcvbuf = if uds_rcvbuf_bytes > 0 {
441 set_rcvbuf(raw_fd, uds_rcvbuf_bytes).unwrap_or(0)
442 } else {
443 0
444 };
445
446 Ok(UdsListener {
447 sock,
448 path,
449 bound_dev,
450 bound_ino,
451 truncated_count: 0,
452 rcvbuf_bytes: granted_rcvbuf,
453 })
454 }
455}
456
457impl BeatListener for UdsListener {
458 fn recv(&mut self) -> RecvResult {
459 match peer_cred::recv_authenticated(self.sock.as_raw_fd()) {
460 RecvResult::ShortRead => {
461 self.truncated_count = self.truncated_count.wrapping_add(1);
462 RecvResult::ShortRead
463 }
464 other => other,
465 }
466 }
467
468 fn drain_truncated(&mut self) -> u64 {
469 let n = self.truncated_count;
470 self.truncated_count = 0;
471 n
472 }
473}
474
475impl Drop for UdsListener {
476 fn drop(&mut self) {
477 use std::os::unix::fs::MetadataExt;
478 if let Ok(meta) = std::fs::metadata(&self.path) {
479 if meta.dev() == self.bound_dev && meta.ino() == self.bound_ino {
480 let _ = std::fs::remove_file(&self.path);
481 }
482 }
483 }
484}
485
486impl UdsListener {
487 /// Effective `SO_RCVBUF` size granted by the kernel for this socket,
488 /// in bytes. `0` if `--uds-rcvbuf-bytes 0` was used or tuning failed.
489 pub fn rcvbuf_bytes(&self) -> u32 {
490 self.rcvbuf_bytes
491 }
492}
493
494/// Fsync the directory containing `path` so the unlink+bind+chmod sequence
495/// is durable across power loss. Uses `sync_all` (`fsync(2)`) rather than
496/// `sync_data` (`fdatasync(2)`) because directory entries are metadata and
497/// `fdatasync` is not guaranteed to flush them.
498fn fsync_parent_dir(path: &Path) -> io::Result<()> {
499 let parent = path.parent().unwrap_or_else(|| Path::new("."));
500 let dir = std::fs::File::open(parent)?;
501 dir.sync_all()
502}
503
504/// Set and read back `SO_RCVBUF` on `fd`. Returns the kernel-granted size
505/// (which Linux doubles then clamps to `net.core.rmem_max`). Fails soft on
506/// `EPERM` (unprivileged observer, low `rmem_max`).
507fn set_rcvbuf(fd: i32, bytes: u32) -> io::Result<u32> {
508 use core::ffi::c_void;
509 use core::mem;
510
511 let val = bytes as i32;
512 let ret = unsafe {
513 setsockopt(
514 fd,
515 SOL_SOCKET,
516 SO_RCVBUF,
517 &val as *const i32 as *const c_void,
518 mem::size_of::<i32>() as u32,
519 )
520 };
521 if ret != 0 {
522 return Err(io::Error::last_os_error());
523 }
524 // Read back the effective value (kernel may grant double the requested).
525 let mut granted: i32 = 0;
526 let mut optlen = mem::size_of::<i32>() as u32;
527 let ret = unsafe {
528 getsockopt(
529 fd,
530 SOL_SOCKET,
531 SO_RCVBUF,
532 &mut granted as *mut i32 as *mut c_void,
533 &mut optlen,
534 )
535 };
536 if ret != 0 {
537 return Err(io::Error::last_os_error());
538 }
539 Ok(granted.max(0) as u32)
540}
541
542#[derive(Clone, Copy, Eq, PartialEq)]
543struct SocketIdentity {
544 dev: u64,
545 ino: u64,
546}
547
548enum PathOccupant {
549 Missing,
550 Socket(SocketIdentity),
551 Other,
552}
553
554fn path_occupant(path: &Path) -> io::Result<PathOccupant> {
555 use std::os::unix::fs::{FileTypeExt, MetadataExt};
556
557 match std::fs::symlink_metadata(path) {
558 Ok(meta) if meta.file_type().is_socket() => Ok(PathOccupant::Socket(SocketIdentity {
559 dev: meta.dev(),
560 ino: meta.ino(),
561 })),
562 Ok(_) => Ok(PathOccupant::Other),
563 Err(e) if e.kind() == ErrorKind::NotFound => Ok(PathOccupant::Missing),
564 Err(e) => Err(e),
565 }
566}
567
568/// Probe whether a live listener is accepting datagrams at `path`.
569fn probe_live(path: &Path) -> io::Result<bool> {
570 let sock = UnixDatagram::unbound()?;
571
572 if let Err(e) = sock.connect(path) {
573 return match e.kind() {
574 ErrorKind::PermissionDenied => Err(e),
575 _ => Ok(false),
576 };
577 }
578
579 match sock.send(&[]) {
580 Ok(_) => Ok(true),
581 Err(e) if e.kind() == ErrorKind::PermissionDenied => Err(e),
582 Err(_) => Ok(false),
583 }
584}
585
586// ---------------------------------------------------------------------------
587// Plaintext UDP listener (feature-gated behind `unsafe-plaintext-udp`)
588//
589// This transport has NO cryptographic authentication. It is exposed only
590// when the operator opts in at *both* compile time (Cargo feature flag whose
591// name starts with `unsafe-`) and runtime (`--i-accept-plaintext-udp`). In
592// any other configuration the plaintext path is structurally unreachable.
593// ---------------------------------------------------------------------------
594
595#[cfg(feature = "unsafe-plaintext-udp")]
596mod udp_impl {
597 use std::io;
598 use std::net::{SocketAddr, UdpSocket};
599
600 use crate::peer_cred::{BeatOrigin, RecvResult};
601
602 use super::BeatListener;
603
604 /// UDP listener for network-based observers.
605 ///
606 /// Receives 32-byte VLP frames over UDP from remote agents. Created via
607 /// [`UdpListener::bind`] and used with [`Observer::from_listener`].
608 ///
609 /// # Security: no authentication
610 ///
611 /// Plain UDP has NO cryptographic authentication — any device on the
612 /// local subnet can inject arbitrary frames. Frame-level PID
613 /// verification is impossible because UDP lacks kernel credential
614 /// attestation (`peer_pid` is always 0). **Do not use this transport in
615 /// production without network segmentation (firewall, VPC) that limits
616 /// which hosts can reach the observer port.**
617 ///
618 /// For authenticated transport, see [`SecureUdpListener`], which provides
619 /// ChaCha20-Poly1305 AEAD per-agent and/or per-epoch master-key decryption
620 /// behind the `secure-udp` feature flag.
621 ///
622 /// The observer emits a startup warning via stderr whenever plaintext UDP
623 /// is in use.
624 ///
625 /// [`Observer::from_listener`]: crate::Observer::from_listener
626 /// [`SecureUdpListener`]: crate::secure_listener::SecureUdpListener
627 pub struct UdpListener {
628 sock: UdpSocket,
629 truncated_count: u64,
630 recovery_trust: super::TransportTrust,
631 }
632
633 impl UdpListener {
634 /// Bind a non-blocking UDP socket on `addr` and return a [`UdpListener`].
635 ///
636 /// # Errors
637 ///
638 /// Returns an [`io::Error`] if the socket cannot be bound or switched
639 /// to non-blocking mode.
640 pub fn bind(addr: SocketAddr) -> io::Result<Self> {
641 let sock = UdpSocket::bind(addr)?;
642 sock.set_nonblocking(true)?;
643 Ok(UdpListener {
644 sock,
645 truncated_count: 0,
646 recovery_trust: super::TransportTrust::Untrusted,
647 })
648 }
649
650 /// Declare this listener recovery-eligible.
651 ///
652 /// When `trust` is [`TransportTrust::Operator`], beats received on
653 /// this listener are stamped [`BeatOrigin::OperatorAttestedTransport`]
654 /// so the runtime recovery gate allows them to fire.
655 pub fn with_recovery_trust(mut self, trust: super::TransportTrust) -> Self {
656 self.recovery_trust = trust;
657 self
658 }
659 }
660
661 impl BeatListener for UdpListener {
662 fn recv(&mut self) -> RecvResult {
663 let mut buf = [0u8; 32];
664 let origin = match self.recovery_trust {
665 super::TransportTrust::Operator => BeatOrigin::OperatorAttestedTransport,
666 super::TransportTrust::Untrusted => BeatOrigin::NetworkUnverified,
667 };
668 loop {
669 match self.sock.recv(&mut buf) {
670 Ok(32) => {
671 return RecvResult::Authenticated {
672 peer_pid: 0,
673 peer_uid: 0,
674 // UDP carries no kernel-attested namespace identity.
675 peer_pid_ns_inode: None,
676 origin,
677 data: buf,
678 };
679 }
680 Ok(_) => {
681 self.truncated_count = self.truncated_count.wrapping_add(1);
682 continue;
683 }
684 Err(e) => match e.kind() {
685 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => {
686 return RecvResult::WouldBlock;
687 }
688 io::ErrorKind::Interrupted => continue,
689 _ => return RecvResult::IoError(e),
690 },
691 }
692 }
693 }
694
695 fn drain_truncated(&mut self) -> u64 {
696 let n = self.truncated_count;
697 self.truncated_count = 0;
698 n
699 }
700 }
701}
702
703#[cfg(feature = "unsafe-plaintext-udp")]
704pub use udp_impl::UdpListener;