Skip to main content

varta_watch/
listener.rs

1//! Transport abstraction for receiving VLP frames.
2//!
3//! The [`BeatListener`] trait is the pluggable receive backend for [`Observer`].
4//! [`UdsListener`] provides the default Unix Domain Socket implementation;
5//! alternative transports (e.g. UDP) are available behind feature flags.
6//!
7//! [`Observer`]: crate::Observer
8
9use core::marker::PhantomData;
10use std::io::{self, ErrorKind};
11use std::os::unix::fs::PermissionsExt;
12use std::os::unix::io::AsRawFd;
13use std::os::unix::net::UnixDatagram;
14use std::path::{Path, PathBuf};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18/// Count of `fsync_parent_dir` failures since process start.  Incremented on
19/// every bind where the parent-directory fsync succeeds at the OS level but
20/// returns an error (e.g. `EINVAL` on platforms that do not support directory
21/// fsync).  Drained by [`drain_bind_dir_fsync_failures`].
22static DIR_FSYNC_FAILED: AtomicU64 = AtomicU64::new(0);
23
24/// Drain and reset the parent-directory fsync failure counter.
25///
26/// Returns the number of `fsync_parent_dir` calls that failed since the last
27/// drain (typically since process start, because bind runs once).  Called by
28/// the observer poll loop to surface `varta_socket_bind_dir_fsync_failed_total`
29/// via the Prometheus exporter.
30pub fn drain_bind_dir_fsync_failures() -> u64 {
31    DIR_FSYNC_FAILED.swap(0, Ordering::Relaxed)
32}
33
34extern "C" {
35    fn umask(mode: u32) -> u32;
36}
37
38// POSIX setsockopt / getsockopt for SO_RCVBUF tuning.
39extern "C" {
40    fn setsockopt(
41        fd: i32,
42        level: i32,
43        optname: i32,
44        optval: *const core::ffi::c_void,
45        optlen: u32,
46    ) -> i32;
47    fn getsockopt(
48        fd: i32,
49        level: i32,
50        optname: i32,
51        optval: *mut core::ffi::c_void,
52        optlen: *mut u32,
53    ) -> i32;
54}
55
56// SOL_SOCKET / SO_RCVBUF — platform-scoped FFI constants for setsockopt(2).
57//
58// Sources verified against vendor headers:
59//   linux:   include/uapi/asm-generic/socket.h    (SOL_SOCKET=1,      SO_RCVBUF=8)
60//   macOS:   xnu/bsd/sys/socket.h                 (SOL_SOCKET=0xffff, SO_RCVBUF=0x1002)
61//   *BSD:    sys/socket.h                         (SOL_SOCKET=0xffff, SO_RCVBUF=0x1002)
62//   illumos: usr/src/uts/common/sys/socket.h      (SOL_SOCKET=0xffff, SO_RCVBUF=0x1002)
63//
64// Adding a new target_os requires verifying these two constants against the
65// platform's headers and extending the cfg-any lists below. The `compile_error!`
66// fallback prevents silent drift to wrong values on an untested platform.
67#[cfg(target_os = "linux")]
68const SOL_SOCKET: i32 = 1;
69#[cfg(any(
70    target_os = "macos",
71    target_os = "ios",
72    target_os = "freebsd",
73    target_os = "dragonfly",
74    target_os = "netbsd",
75    target_os = "openbsd",
76    target_os = "illumos",
77    target_os = "solaris",
78))]
79const SOL_SOCKET: i32 = 0xffff_u32 as i32;
80
81#[cfg(target_os = "linux")]
82const SO_RCVBUF: i32 = 8;
83#[cfg(any(
84    target_os = "macos",
85    target_os = "ios",
86    target_os = "freebsd",
87    target_os = "dragonfly",
88    target_os = "netbsd",
89    target_os = "openbsd",
90    target_os = "illumos",
91    target_os = "solaris",
92))]
93const SO_RCVBUF: i32 = 0x1002;
94
95#[cfg(not(any(
96    target_os = "linux",
97    target_os = "macos",
98    target_os = "ios",
99    target_os = "freebsd",
100    target_os = "dragonfly",
101    target_os = "netbsd",
102    target_os = "openbsd",
103    target_os = "illumos",
104    target_os = "solaris",
105)))]
106compile_error!(
107    "varta-watch has no verified SOL_SOCKET / SO_RCVBUF values for this target_os. \
108     Verify against the platform's <sys/socket.h> and extend the cfg-any lists in \
109     crates/varta-watch/src/listener.rs."
110);
111
112/// Attests that the process is single-threaded at construction time.
113///
114/// [`UdsListener::bind`] calls `umask(2)`, which is process-wide; any thread
115/// creating filesystem objects during the bind window would inherit the
116/// restricted umask. Holding a `&PreThreadAttestation` encodes the
117/// single-threaded precondition in the type signature so the invariant is
118/// enforced at compile time, not just by convention.
119///
120/// Construct exactly once at the top of `fn main`, before any thread spawn:
121///
122/// ```text
123/// let pre_thread = PreThreadAttestation::new()?;
124/// // … then pass &pre_thread to Observer::bind / UdsListener::bind
125/// ```
126///
127/// The token is `!Send + !Sync` (via `PhantomData<*const ()>`) so it cannot
128/// be moved into or shared across thread boundaries after construction.
129#[derive(Debug)]
130pub struct PreThreadAttestation {
131    _no_send: PhantomData<*const ()>,
132}
133
134impl PreThreadAttestation {
135    /// Probe the OS thread count and return a token if the process is
136    /// single-threaded.
137    ///
138    /// On Linux counts `/proc/self/task/` entries. On macOS calls
139    /// `pthread_is_threaded_np(3)`. On other platforms the runtime probe is
140    /// skipped; the type-level structural guarantee still holds.
141    ///
142    /// # Errors
143    ///
144    /// Returns [`io::ErrorKind::Other`] if the process has more than one
145    /// thread, or if the Linux `/proc/self/task` directory is unreadable.
146    pub fn new() -> io::Result<Self> {
147        Self::probe()?;
148        Ok(Self {
149            _no_send: PhantomData,
150        })
151    }
152
153    /// Create a token without a runtime probe.
154    ///
155    /// Intended for test code where the multi-threaded test runner would
156    /// incorrectly fail the probe even though the umask window is benign.
157    ///
158    /// # Safety
159    ///
160    /// The caller must ensure that no concurrent thread creates filesystem
161    /// objects during the `UdsListener::bind` window, or that any such race
162    /// is acceptable in the calling context.
163    pub unsafe fn new_unchecked() -> Self {
164        Self {
165            _no_send: PhantomData,
166        }
167    }
168
169    #[cfg(target_os = "linux")]
170    fn probe() -> io::Result<()> {
171        let mut count: usize = 0;
172        for entry in std::fs::read_dir("/proc/self/task")? {
173            entry?;
174            count += 1;
175            if count > 1 {
176                return Err(io::Error::new(
177                    io::ErrorKind::Other,
178                    "process is multi-threaded; UdsListener::bind changes \
179                     umask(2) process-wide and would race concurrent file creation",
180                ));
181            }
182        }
183        Ok(())
184    }
185
186    #[cfg(target_os = "macos")]
187    fn probe() -> io::Result<()> {
188        extern "C" {
189            // Available in macOS pthread.h since 10.0.
190            // Returns 0 when single-threaded, 1 when multi-threaded.
191            fn pthread_is_threaded_np() -> i32;
192        }
193        // SAFETY: pthread_is_threaded_np is a pure read of a per-process flag
194        // with no side effects and a stable ABI across all macOS versions.
195        if unsafe { pthread_is_threaded_np() } != 0 {
196            return Err(io::Error::new(
197                io::ErrorKind::Other,
198                "process is multi-threaded; UdsListener::bind changes \
199                 umask(2) process-wide and would race concurrent file creation",
200            ));
201        }
202        Ok(())
203    }
204
205    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
206    fn probe() -> io::Result<()> {
207        // No per-platform thread-count probe is implemented here.
208        // The type-level guarantee — UdsListener::bind requires a
209        // &PreThreadAttestation that can only be soundly constructed before
210        // the first thread spawn — remains the primary enforcement.
211        Ok(())
212    }
213}
214
215/// RAII guard that restores the process umask on drop, even if a panic
216/// unwinds through the bind path.
217struct UmaskGuard(u32);
218
219impl Drop for UmaskGuard {
220    fn drop(&mut self) {
221        unsafe {
222            umask(self.0);
223        }
224    }
225}
226
227use crate::peer_cred::{self, RecvResult};
228
229/// Per-listener operator trust declaration for recovery eligibility.
230///
231/// Passed to a listener's builder method to promote UDP-origin beats from
232/// [`BeatOrigin::NetworkUnverified`] to [`BeatOrigin::OperatorAttestedTransport`].
233/// Recovery commands will then fire for stalls on that listener, as they
234/// would for kernel-attested UDS beats.
235///
236/// This is a structural enforcement of per-listener trust — there is no
237/// daemon-wide way to grant recovery trust to a UDP listener.
238#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
239pub enum TransportTrust {
240    /// No operator declaration — beats from this listener are stamped
241    /// [`BeatOrigin::NetworkUnverified`]. Recovery is refused for stalls.
242    #[default]
243    Untrusted,
244    /// Operator has explicitly accepted the security risk for this listener.
245    /// Beats are stamped [`BeatOrigin::OperatorAttestedTransport`] so the
246    /// runtime recovery gate allows them to fire.
247    Operator,
248}
249
250/// Abstraction over a source that can receive 32-byte VLP frames.
251///
252/// Implementations must be `Send + 'static` so [`Observer`] can be moved.
253///
254/// [`Observer`]: crate::Observer
255pub trait BeatListener: Send + 'static {
256    /// Receive one datagram. Returns the same `RecvResult` as
257    /// `peer_cred::recv_authenticated` — callers see `Authenticated`,
258    /// `WouldBlock`, `ShortRead`, or `IoError`.
259    fn recv(&mut self) -> RecvResult;
260
261    /// Drain and reset the AEAD decryption failure counter.
262    ///
263    /// The default implementation returns 0 — only listeners that perform
264    /// authenticated decryption will override this.
265    fn drain_decrypt_failures(&mut self) -> u64 {
266        0
267    }
268
269    /// Drain and reset the truncated-datagram counter.
270    fn drain_truncated(&mut self) -> u64 {
271        0
272    }
273
274    /// Drain and reset the sender-state-full counter.
275    ///
276    /// Incremented when the per-sender replay map is at capacity and a
277    /// stale-sender sweep fails to free space, forcing eviction of the
278    /// oldest entry. Only listeners that maintain per-sender state will
279    /// override this.
280    fn drain_sender_state_full(&mut self) -> u64 {
281        0
282    }
283
284    /// Drain and reset the AEAD-decryption-attempt counter.
285    ///
286    /// Counted by listeners that trial every loaded key on every frame
287    /// without early-exit on success — the constant-trial-count poll that
288    /// closes the key-rotation timing side-channel. Only the secure-UDP
289    /// listener overrides this.
290    fn drain_aead_attempts(&mut self) -> u64 {
291        0
292    }
293}
294
295/// Unix Domain Socket listener for local IPC.
296///
297/// Created via [`UdsListener::bind`] and used as the default backend for
298/// [`Observer::bind`].
299///
300/// [`Observer::bind`]: crate::Observer::bind
301pub struct UdsListener {
302    sock: UnixDatagram,
303    path: PathBuf,
304    bound_dev: u64,
305    bound_ino: u64,
306    truncated_count: u64,
307    /// Effective `SO_RCVBUF` granted by the kernel (may be less than
308    /// requested due to `net.core.rmem_max`).  `0` means no tuning was
309    /// attempted (`--uds-rcvbuf-bytes 0`).
310    rcvbuf_bytes: u32,
311}
312
313impl UdsListener {
314    /// Bind a Unix datagram socket at `path` and return a [`UdsListener`].
315    ///
316    /// The socket file permissions are set to `socket_mode` (octal, e.g.
317    /// `0o600`) after a successful bind. Credential passing is enabled on
318    /// the socket so that `recv` can verify the PID of every sender against
319    /// the kernel's `SO_PASSCRED` / `LOCAL_CREDS` attestation.
320    ///
321    /// If a genuine stale socket exists at `path` (a socket inode with no
322    /// listener), it is cleaned up and the bind succeeds. If another
323    /// process is already listening at `path`, or if the path is occupied by
324    /// a non-socket file, the call fails with `AddrInUse`.
325    ///
326    /// The socket is given a read timeout so `recv` cannot block
327    /// indefinitely.
328    pub fn bind(
329        path: impl AsRef<Path>,
330        socket_mode: u32,
331        read_timeout: Duration,
332        uds_rcvbuf_bytes: u32,
333        _pre_thread: &PreThreadAttestation,
334    ) -> io::Result<Self> {
335        let path = path.as_ref();
336        let owned_path: PathBuf = path.to_path_buf();
337
338        let restrict_umask = !socket_mode & 0o777;
339        let _umask_guard = UmaskGuard(unsafe { umask(restrict_umask) });
340        let bind_result = UnixDatagram::bind(path);
341        let sock = match bind_result {
342            Ok(sock) => sock,
343            Err(e) if e.kind() == ErrorKind::AddrInUse => {
344                let PathOccupant::Socket(stale_socket) = path_occupant(path)? else {
345                    return Err(io::Error::new(
346                        ErrorKind::AddrInUse,
347                        format!(
348                            "cannot bind observer socket at {}: path exists and is not a socket",
349                            path.display()
350                        ),
351                    ));
352                };
353
354                match probe_live(path) {
355                    Ok(true) => {
356                        return Err(io::Error::new(
357                            ErrorKind::AddrInUse,
358                            format!(
359                                "another varta-watch is already running at {}",
360                                path.display()
361                            ),
362                        ));
363                    }
364                    Ok(false) => {
365                        match path_occupant(path)? {
366                            PathOccupant::Socket(current) if current == stale_socket => {
367                                std::fs::remove_file(path)?;
368                            }
369                            PathOccupant::Missing => {}
370                            PathOccupant::Socket(_) => {
371                                return Err(io::Error::new(
372                                    ErrorKind::AddrInUse,
373                                    format!(
374                                        "observer socket path changed while probing {}; retry bind",
375                                        path.display()
376                                    ),
377                                ));
378                            }
379                            PathOccupant::Other => {
380                                return Err(io::Error::new(
381                                    ErrorKind::AddrInUse,
382                                    format!(
383                                        "cannot bind observer socket at {}: path exists and is not a socket",
384                                        path.display()
385                                    ),
386                                ));
387                            }
388                        }
389                        let _umask_guard = UmaskGuard(unsafe { umask(restrict_umask) });
390                        let sock = UnixDatagram::bind(path)?;
391                        std::fs::set_permissions(
392                            path,
393                            std::fs::Permissions::from_mode(socket_mode),
394                        )?;
395                        return Self::finish_bind(sock, owned_path, read_timeout, uds_rcvbuf_bytes);
396                    }
397                    Err(e) => {
398                        return Err(io::Error::new(
399                            e.kind(),
400                            format!("cannot probe socket at {}: {e}", path.display()),
401                        ));
402                    }
403                }
404            }
405            Err(e) => return Err(e),
406        };
407
408        std::fs::set_permissions(path, std::fs::Permissions::from_mode(socket_mode))?;
409        Self::finish_bind(sock, owned_path, read_timeout, uds_rcvbuf_bytes)
410    }
411
412    fn finish_bind(
413        sock: UnixDatagram,
414        path: PathBuf,
415        read_timeout: Duration,
416        uds_rcvbuf_bytes: u32,
417    ) -> io::Result<Self> {
418        use std::os::unix::fs::MetadataExt;
419
420        sock.set_read_timeout(Some(read_timeout))?;
421        let raw_fd = sock.as_raw_fd();
422        peer_cred::enable_credential_passing(raw_fd)?;
423
424        let meta = std::fs::metadata(&path)?;
425        let bound_dev = meta.dev();
426        let bound_ino = meta.ino();
427
428        // Fsync the parent directory so the unlink+bind+chmod sequence is
429        // durable across power loss or an unclean shutdown.  The bind has
430        // already succeeded — a directory-fsync failure is treated as a soft
431        // durability degradation rather than a startup failure (some exotic
432        // platforms return EINVAL for directory fsync).
433        if let Err(e) = fsync_parent_dir(&path) {
434            crate::varta_warn!(
435                "uds bind: parent-directory fsync failed (durability degraded): {e}"
436            );
437            DIR_FSYNC_FAILED.fetch_add(1, Ordering::Relaxed);
438        }
439
440        let granted_rcvbuf = if uds_rcvbuf_bytes > 0 {
441            set_rcvbuf(raw_fd, uds_rcvbuf_bytes).unwrap_or(0)
442        } else {
443            0
444        };
445
446        Ok(UdsListener {
447            sock,
448            path,
449            bound_dev,
450            bound_ino,
451            truncated_count: 0,
452            rcvbuf_bytes: granted_rcvbuf,
453        })
454    }
455}
456
457impl BeatListener for UdsListener {
458    fn recv(&mut self) -> RecvResult {
459        match peer_cred::recv_authenticated(self.sock.as_raw_fd()) {
460            RecvResult::ShortRead => {
461                self.truncated_count = self.truncated_count.wrapping_add(1);
462                RecvResult::ShortRead
463            }
464            other => other,
465        }
466    }
467
468    fn drain_truncated(&mut self) -> u64 {
469        let n = self.truncated_count;
470        self.truncated_count = 0;
471        n
472    }
473}
474
475impl Drop for UdsListener {
476    fn drop(&mut self) {
477        use std::os::unix::fs::MetadataExt;
478        if let Ok(meta) = std::fs::metadata(&self.path) {
479            if meta.dev() == self.bound_dev && meta.ino() == self.bound_ino {
480                let _ = std::fs::remove_file(&self.path);
481            }
482        }
483    }
484}
485
486impl UdsListener {
487    /// Effective `SO_RCVBUF` size granted by the kernel for this socket,
488    /// in bytes.  `0` if `--uds-rcvbuf-bytes 0` was used or tuning failed.
489    pub fn rcvbuf_bytes(&self) -> u32 {
490        self.rcvbuf_bytes
491    }
492}
493
494/// Fsync the directory containing `path` so the unlink+bind+chmod sequence
495/// is durable across power loss.  Uses `sync_all` (`fsync(2)`) rather than
496/// `sync_data` (`fdatasync(2)`) because directory entries are metadata and
497/// `fdatasync` is not guaranteed to flush them.
498fn fsync_parent_dir(path: &Path) -> io::Result<()> {
499    let parent = path.parent().unwrap_or_else(|| Path::new("."));
500    let dir = std::fs::File::open(parent)?;
501    dir.sync_all()
502}
503
504/// Set and read back `SO_RCVBUF` on `fd`.  Returns the kernel-granted size
505/// (which Linux doubles then clamps to `net.core.rmem_max`).  Fails soft on
506/// `EPERM` (unprivileged observer, low `rmem_max`).
507fn set_rcvbuf(fd: i32, bytes: u32) -> io::Result<u32> {
508    use core::ffi::c_void;
509    use core::mem;
510
511    let val = bytes as i32;
512    let ret = unsafe {
513        setsockopt(
514            fd,
515            SOL_SOCKET,
516            SO_RCVBUF,
517            &val as *const i32 as *const c_void,
518            mem::size_of::<i32>() as u32,
519        )
520    };
521    if ret != 0 {
522        return Err(io::Error::last_os_error());
523    }
524    // Read back the effective value (kernel may grant double the requested).
525    let mut granted: i32 = 0;
526    let mut optlen = mem::size_of::<i32>() as u32;
527    let ret = unsafe {
528        getsockopt(
529            fd,
530            SOL_SOCKET,
531            SO_RCVBUF,
532            &mut granted as *mut i32 as *mut c_void,
533            &mut optlen,
534        )
535    };
536    if ret != 0 {
537        return Err(io::Error::last_os_error());
538    }
539    Ok(granted.max(0) as u32)
540}
541
542#[derive(Clone, Copy, Eq, PartialEq)]
543struct SocketIdentity {
544    dev: u64,
545    ino: u64,
546}
547
548enum PathOccupant {
549    Missing,
550    Socket(SocketIdentity),
551    Other,
552}
553
554fn path_occupant(path: &Path) -> io::Result<PathOccupant> {
555    use std::os::unix::fs::{FileTypeExt, MetadataExt};
556
557    match std::fs::symlink_metadata(path) {
558        Ok(meta) if meta.file_type().is_socket() => Ok(PathOccupant::Socket(SocketIdentity {
559            dev: meta.dev(),
560            ino: meta.ino(),
561        })),
562        Ok(_) => Ok(PathOccupant::Other),
563        Err(e) if e.kind() == ErrorKind::NotFound => Ok(PathOccupant::Missing),
564        Err(e) => Err(e),
565    }
566}
567
568/// Probe whether a live listener is accepting datagrams at `path`.
569fn probe_live(path: &Path) -> io::Result<bool> {
570    let sock = UnixDatagram::unbound()?;
571
572    if let Err(e) = sock.connect(path) {
573        return match e.kind() {
574            ErrorKind::PermissionDenied => Err(e),
575            _ => Ok(false),
576        };
577    }
578
579    match sock.send(&[]) {
580        Ok(_) => Ok(true),
581        Err(e) if e.kind() == ErrorKind::PermissionDenied => Err(e),
582        Err(_) => Ok(false),
583    }
584}
585
586// ---------------------------------------------------------------------------
587// Plaintext UDP listener (feature-gated behind `unsafe-plaintext-udp`)
588//
589// This transport has NO cryptographic authentication.  It is exposed only
590// when the operator opts in at *both* compile time (Cargo feature flag whose
591// name starts with `unsafe-`) and runtime (`--i-accept-plaintext-udp`).  In
592// any other configuration the plaintext path is structurally unreachable.
593// ---------------------------------------------------------------------------
594
595#[cfg(feature = "unsafe-plaintext-udp")]
596mod udp_impl {
597    use std::io;
598    use std::net::{SocketAddr, UdpSocket};
599
600    use crate::peer_cred::{BeatOrigin, RecvResult};
601
602    use super::BeatListener;
603
604    /// UDP listener for network-based observers.
605    ///
606    /// Receives 32-byte VLP frames over UDP from remote agents. Created via
607    /// [`UdpListener::bind`] and used with [`Observer::from_listener`].
608    ///
609    /// # Security: no authentication
610    ///
611    /// Plain UDP has NO cryptographic authentication — any device on the
612    /// local subnet can inject arbitrary frames. Frame-level PID
613    /// verification is impossible because UDP lacks kernel credential
614    /// attestation (`peer_pid` is always 0). **Do not use this transport in
615    /// production without network segmentation (firewall, VPC) that limits
616    /// which hosts can reach the observer port.**
617    ///
618    /// For authenticated transport, see [`SecureUdpListener`], which provides
619    /// ChaCha20-Poly1305 AEAD per-agent and/or per-epoch master-key decryption
620    /// behind the `secure-udp` feature flag.
621    ///
622    /// The observer emits a startup warning via stderr whenever plaintext UDP
623    /// is in use.
624    ///
625    /// [`Observer::from_listener`]: crate::Observer::from_listener
626    /// [`SecureUdpListener`]: crate::secure_listener::SecureUdpListener
627    pub struct UdpListener {
628        sock: UdpSocket,
629        truncated_count: u64,
630        recovery_trust: super::TransportTrust,
631    }
632
633    impl UdpListener {
634        /// Bind a non-blocking UDP socket on `addr` and return a [`UdpListener`].
635        ///
636        /// # Errors
637        ///
638        /// Returns an [`io::Error`] if the socket cannot be bound or switched
639        /// to non-blocking mode.
640        pub fn bind(addr: SocketAddr) -> io::Result<Self> {
641            let sock = UdpSocket::bind(addr)?;
642            sock.set_nonblocking(true)?;
643            Ok(UdpListener {
644                sock,
645                truncated_count: 0,
646                recovery_trust: super::TransportTrust::Untrusted,
647            })
648        }
649
650        /// Declare this listener recovery-eligible.
651        ///
652        /// When `trust` is [`TransportTrust::Operator`], beats received on
653        /// this listener are stamped [`BeatOrigin::OperatorAttestedTransport`]
654        /// so the runtime recovery gate allows them to fire.
655        pub fn with_recovery_trust(mut self, trust: super::TransportTrust) -> Self {
656            self.recovery_trust = trust;
657            self
658        }
659    }
660
661    impl BeatListener for UdpListener {
662        fn recv(&mut self) -> RecvResult {
663            let mut buf = [0u8; 32];
664            let origin = match self.recovery_trust {
665                super::TransportTrust::Operator => BeatOrigin::OperatorAttestedTransport,
666                super::TransportTrust::Untrusted => BeatOrigin::NetworkUnverified,
667            };
668            loop {
669                match self.sock.recv(&mut buf) {
670                    Ok(32) => {
671                        return RecvResult::Authenticated {
672                            peer_pid: 0,
673                            peer_uid: 0,
674                            // UDP carries no kernel-attested namespace identity.
675                            peer_pid_ns_inode: None,
676                            origin,
677                            data: buf,
678                        };
679                    }
680                    Ok(_) => {
681                        self.truncated_count = self.truncated_count.wrapping_add(1);
682                        continue;
683                    }
684                    Err(e) => match e.kind() {
685                        io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => {
686                            return RecvResult::WouldBlock;
687                        }
688                        io::ErrorKind::Interrupted => continue,
689                        _ => return RecvResult::IoError(e),
690                    },
691                }
692            }
693        }
694
695        fn drain_truncated(&mut self) -> u64 {
696            let n = self.truncated_count;
697            self.truncated_count = 0;
698            n
699        }
700    }
701}
702
703#[cfg(feature = "unsafe-plaintext-udp")]
704pub use udp_impl::UdpListener;