Skip to main content

secure_exec_kernel/
socket_table.rs

1use crate::poll::{PollEvents, POLLERR, POLLHUP, POLLIN, POLLOUT};
2use crate::vfs::normalize_path;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::error::Error;
5use std::fmt;
6use std::net::{Ipv4Addr, Ipv6Addr};
7use std::sync::{Arc, Mutex, MutexGuard};
8
9pub type SocketId = u64;
10pub type SocketResult<T> = Result<T, SocketTableError>;
11
12#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
13pub struct InetSocketAddress {
14    host: String,
15    port: u16,
16}
17
18impl InetSocketAddress {
19    pub fn new(host: impl Into<String>, port: u16) -> Self {
20        Self {
21            host: host.into(),
22            port,
23        }
24    }
25
26    pub fn host(&self) -> &str {
27        &self.host
28    }
29
30    pub const fn port(&self) -> u16 {
31        self.port
32    }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
36pub enum SocketDomain {
37    Inet,
38    Inet6,
39    Unix,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
43pub enum SocketType {
44    Stream,
45    Datagram,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
49pub enum SocketState {
50    Created,
51    Bound,
52    Listening,
53    Connected,
54}
55
56impl SocketState {
57    pub const fn counts_as_listener(self) -> bool {
58        matches!(self, Self::Listening)
59    }
60
61    pub const fn counts_as_connection(self) -> bool {
62        matches!(self, Self::Connected)
63    }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum SocketShutdown {
68    Read,
69    Write,
70    Both,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum DatagramSocketOption {
75    ReuseAddr,
76    ReusePort,
77    Broadcast,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct SocketSpec {
82    pub domain: SocketDomain,
83    pub socket_type: SocketType,
84}
85
86impl SocketSpec {
87    pub const fn new(domain: SocketDomain, socket_type: SocketType) -> Self {
88        Self {
89            domain,
90            socket_type,
91        }
92    }
93
94    pub const fn tcp() -> Self {
95        Self::new(SocketDomain::Inet, SocketType::Stream)
96    }
97
98    pub const fn udp() -> Self {
99        Self::new(SocketDomain::Inet, SocketType::Datagram)
100    }
101
102    pub const fn unix_stream() -> Self {
103        Self::new(SocketDomain::Unix, SocketType::Stream)
104    }
105
106    pub const fn unix_datagram() -> Self {
107        Self::new(SocketDomain::Unix, SocketType::Datagram)
108    }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct SocketRecord {
113    id: SocketId,
114    owner_pid: u32,
115    spec: SocketSpec,
116    state: SocketState,
117    local_address: Option<InetSocketAddress>,
118    peer_address: Option<InetSocketAddress>,
119    local_unix_path: Option<String>,
120    peer_unix_path: Option<String>,
121    listener_state: Option<ListenerState>,
122    connection_state: Option<ConnectionState>,
123    datagram_state: Option<DatagramState>,
124}
125
126impl SocketRecord {
127    pub const fn id(&self) -> SocketId {
128        self.id
129    }
130
131    pub const fn owner_pid(&self) -> u32 {
132        self.owner_pid
133    }
134
135    pub const fn spec(&self) -> SocketSpec {
136        self.spec
137    }
138
139    pub const fn state(&self) -> SocketState {
140        self.state
141    }
142
143    pub fn local_address(&self) -> Option<&InetSocketAddress> {
144        self.local_address.as_ref()
145    }
146
147    pub fn peer_address(&self) -> Option<&InetSocketAddress> {
148        self.peer_address.as_ref()
149    }
150
151    pub fn local_unix_path(&self) -> Option<&str> {
152        self.local_unix_path.as_deref()
153    }
154
155    pub fn peer_unix_path(&self) -> Option<&str> {
156        self.peer_unix_path.as_deref()
157    }
158
159    pub fn listen_backlog(&self) -> Option<usize> {
160        self.listener_state.as_ref().map(|state| state.backlog)
161    }
162
163    pub fn pending_accept_count(&self) -> usize {
164        self.listener_state
165            .as_ref()
166            .map(|state| state.pending_accepts.len())
167            .unwrap_or(0)
168    }
169
170    pub fn peer_socket_id(&self) -> Option<SocketId> {
171        self.connection_state
172            .as_ref()
173            .and_then(|state| state.peer_socket_id)
174    }
175
176    pub fn buffered_read_bytes(&self) -> usize {
177        self.connection_state
178            .as_ref()
179            .map(|state| state.recv_buffer.len())
180            .unwrap_or(0)
181    }
182
183    pub fn read_shutdown(&self) -> bool {
184        self.connection_state
185            .as_ref()
186            .map(|state| state.read_shutdown)
187            .unwrap_or(false)
188    }
189
190    pub fn write_shutdown(&self) -> bool {
191        self.connection_state
192            .as_ref()
193            .map(|state| state.write_shutdown)
194            .unwrap_or(false)
195    }
196
197    pub fn peer_write_shutdown(&self) -> bool {
198        self.connection_state
199            .as_ref()
200            .map(|state| state.peer_write_shutdown)
201            .unwrap_or(false)
202    }
203
204    pub fn queued_datagrams(&self) -> usize {
205        self.datagram_state
206            .as_ref()
207            .map(|state| state.recv_queue.len())
208            .unwrap_or(0)
209    }
210
211    pub fn queued_datagram_bytes(&self) -> usize {
212        self.datagram_state
213            .as_ref()
214            .map(|state| datagram_queue_bytes(&state.recv_queue))
215            .unwrap_or(0)
216    }
217
218    pub fn reuse_address(&self) -> bool {
219        self.datagram_state
220            .as_ref()
221            .map(|state| state.reuse_addr)
222            .unwrap_or(false)
223    }
224
225    pub fn reuse_port(&self) -> bool {
226        self.datagram_state
227            .as_ref()
228            .map(|state| state.reuse_port)
229            .unwrap_or(false)
230    }
231
232    pub fn broadcast_enabled(&self) -> bool {
233        self.datagram_state
234            .as_ref()
235            .map(|state| state.broadcast)
236            .unwrap_or(false)
237    }
238
239    pub fn multicast_membership_count(&self) -> usize {
240        self.datagram_state
241            .as_ref()
242            .map(|state| state.multicast_memberships.len())
243            .unwrap_or(0)
244    }
245
246    pub fn has_multicast_membership(&self, membership: &SocketMulticastMembership) -> bool {
247        self.datagram_state
248            .as_ref()
249            .map(|state| state.multicast_memberships.contains(membership))
250            .unwrap_or(false)
251    }
252}
253
254#[derive(Debug, Clone, PartialEq, Eq)]
255pub struct ReceivedDatagram {
256    source_address: Option<InetSocketAddress>,
257    payload: Vec<u8>,
258}
259
260impl ReceivedDatagram {
261    pub fn source_address(&self) -> Option<&InetSocketAddress> {
262        self.source_address.as_ref()
263    }
264
265    pub fn payload(&self) -> &[u8] {
266        &self.payload
267    }
268
269    pub fn into_parts(self) -> (Option<InetSocketAddress>, Vec<u8>) {
270        (self.source_address, self.payload)
271    }
272}
273
274#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
275pub struct SocketTableSnapshot {
276    pub sockets: usize,
277    pub listeners: usize,
278    pub connections: usize,
279    pub buffered_bytes: usize,
280    pub datagram_queue_len: usize,
281}
282
283#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
284pub struct SocketMulticastMembership {
285    group_address: String,
286    interface_address: Option<String>,
287}
288
289impl SocketMulticastMembership {
290    pub fn new(group_address: impl Into<String>, interface_address: Option<String>) -> Self {
291        Self {
292            group_address: group_address.into(),
293            interface_address,
294        }
295    }
296
297    pub fn group_address(&self) -> &str {
298        &self.group_address
299    }
300
301    pub fn interface_address(&self) -> Option<&str> {
302        self.interface_address.as_deref()
303    }
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct SocketTableError {
308    code: &'static str,
309    message: String,
310}
311
312impl SocketTableError {
313    pub fn code(&self) -> &'static str {
314        self.code
315    }
316
317    fn not_found(socket_id: SocketId) -> Self {
318        Self {
319            code: "ENOENT",
320            message: format!("no such socket {socket_id}"),
321        }
322    }
323
324    fn invalid_argument(message: impl Into<String>) -> Self {
325        Self {
326            code: "EINVAL",
327            message: message.into(),
328        }
329    }
330
331    fn address_in_use(message: impl Into<String>) -> Self {
332        Self {
333            code: "EADDRINUSE",
334            message: message.into(),
335        }
336    }
337
338    fn address_not_available(message: impl Into<String>) -> Self {
339        Self {
340            code: "EADDRNOTAVAIL",
341            message: message.into(),
342        }
343    }
344
345    fn not_found_address(message: impl Into<String>) -> Self {
346        Self {
347            code: "ECONNREFUSED",
348            message: message.into(),
349        }
350    }
351
352    fn would_block(message: impl Into<String>) -> Self {
353        Self {
354            code: "EAGAIN",
355            message: message.into(),
356        }
357    }
358
359    fn not_connected(message: impl Into<String>) -> Self {
360        Self {
361            code: "ENOTCONN",
362            message: message.into(),
363        }
364    }
365
366    fn broken_pipe(message: impl Into<String>) -> Self {
367        Self {
368            code: "EPIPE",
369            message: message.into(),
370        }
371    }
372}
373
374impl fmt::Display for SocketTableError {
375    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376        write!(f, "{}: {}", self.code, self.message)
377    }
378}
379
380impl Error for SocketTableError {}
381
382#[derive(Debug, Default)]
383struct SocketTableState {
384    sockets: BTreeMap<SocketId, SocketRecord>,
385    by_owner: BTreeMap<u32, BTreeSet<SocketId>>,
386    bound_inet_streams: BTreeMap<InetSocketAddress, SocketId>,
387    bound_inet_datagrams: BTreeMap<InetSocketAddress, BTreeSet<SocketId>>,
388    bound_unix_streams: BTreeMap<String, SocketId>,
389    multicast_groups: BTreeMap<SocketMulticastMembership, BTreeSet<SocketId>>,
390    next_socket_id: SocketId,
391}
392
393#[derive(Debug, Clone, PartialEq, Eq)]
394struct ListenerState {
395    backlog: usize,
396    pending_accepts: VecDeque<PendingConnection>,
397}
398
399#[derive(Debug, Clone, PartialEq, Eq, Default)]
400struct ConnectionState {
401    peer_socket_id: Option<SocketId>,
402    recv_buffer: VecDeque<u8>,
403    read_shutdown: bool,
404    write_shutdown: bool,
405    peer_write_shutdown: bool,
406}
407
408#[derive(Debug, Clone, PartialEq, Eq)]
409struct PendingConnection {
410    peer_address: Option<InetSocketAddress>,
411    peer_unix_path: Option<String>,
412    accepted_socket_id: Option<SocketId>,
413}
414
415#[derive(Debug, Clone, PartialEq, Eq, Default)]
416struct DatagramState {
417    recv_queue: VecDeque<QueuedDatagram>,
418    reuse_addr: bool,
419    reuse_port: bool,
420    broadcast: bool,
421    multicast_memberships: BTreeSet<SocketMulticastMembership>,
422}
423
424#[derive(Debug, Clone, PartialEq, Eq)]
425struct QueuedDatagram {
426    source_address: Option<InetSocketAddress>,
427    payload: Vec<u8>,
428}
429
430#[derive(Debug, Default)]
431struct SocketTableInner {
432    state: Mutex<SocketTableState>,
433}
434
435#[derive(Debug, Clone, Default)]
436pub struct SocketTable {
437    inner: Arc<SocketTableInner>,
438}
439
440impl SocketTable {
441    pub fn new() -> Self {
442        Self::default()
443    }
444
445    pub fn allocate(&self, owner_pid: u32, spec: SocketSpec) -> SocketRecord {
446        self.allocate_with_state(owner_pid, spec, SocketState::Created)
447    }
448
449    pub fn allocate_with_state(
450        &self,
451        owner_pid: u32,
452        spec: SocketSpec,
453        state: SocketState,
454    ) -> SocketRecord {
455        let mut table = lock_or_recover(&self.inner.state);
456        let socket_id = next_socket_id(&mut table);
457        let record = SocketRecord {
458            id: socket_id,
459            owner_pid,
460            spec,
461            state,
462            local_address: None,
463            peer_address: None,
464            local_unix_path: None,
465            peer_unix_path: None,
466            listener_state: None,
467            connection_state: default_connection_state(spec, state),
468            datagram_state: default_datagram_state(spec),
469        };
470        table.sockets.insert(socket_id, record.clone());
471        table
472            .by_owner
473            .entry(owner_pid)
474            .or_default()
475            .insert(socket_id);
476        record
477    }
478
479    pub fn get(&self, socket_id: SocketId) -> Option<SocketRecord> {
480        lock_or_recover(&self.inner.state)
481            .sockets
482            .get(&socket_id)
483            .cloned()
484    }
485
486    pub fn update_state(
487        &self,
488        socket_id: SocketId,
489        new_state: SocketState,
490    ) -> SocketResult<SocketRecord> {
491        let mut table = lock_or_recover(&self.inner.state);
492        let record = table
493            .sockets
494            .get_mut(&socket_id)
495            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
496        validate_state_transition(record.state, new_state)?;
497        record.state = new_state;
498        if new_state != SocketState::Listening {
499            record.listener_state = None;
500        }
501        if new_state == SocketState::Connected && supports_connection_lifecycle(record.spec) {
502            record
503                .connection_state
504                .get_or_insert_with(ConnectionState::default);
505        } else if new_state != SocketState::Connected {
506            record.connection_state = None;
507        }
508        Ok(record.clone())
509    }
510
511    pub fn bind_inet(
512        &self,
513        socket_id: SocketId,
514        address: InetSocketAddress,
515    ) -> SocketResult<SocketRecord> {
516        let address = normalize_inet_address(address);
517        let mut table = lock_or_recover(&self.inner.state);
518        let existing = table
519            .sockets
520            .get(&socket_id)
521            .cloned()
522            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
523        if !supports_inet_bind(existing.spec) {
524            return Err(SocketTableError::invalid_argument(format!(
525                "socket {socket_id} is not an INET socket"
526            )));
527        }
528        let conflicting_ids =
529            lookup_conflicting_bound_inet_socket_ids(&table, existing.spec, &address);
530        if has_incompatible_inet_bind_conflict(&table, &existing, &conflicting_ids) {
531            return Err(SocketTableError::address_in_use(format!(
532                "address {}:{} is already bound",
533                address.host(),
534                address.port()
535            )));
536        }
537        let cloned = {
538            let record = table
539                .sockets
540                .get_mut(&socket_id)
541                .ok_or_else(|| SocketTableError::not_found(socket_id))?;
542
543            match record.state {
544                SocketState::Created => {}
545                SocketState::Bound if record.local_address.as_ref() == Some(&address) => {
546                    return Ok(record.clone());
547                }
548                SocketState::Bound | SocketState::Listening | SocketState::Connected => {
549                    return Err(SocketTableError::invalid_argument(format!(
550                        "socket {socket_id} cannot bind in state {:?}",
551                        record.state
552                    )));
553                }
554            }
555
556            record.local_address = Some(address.clone());
557            record.peer_address = None;
558            record.local_unix_path = None;
559            record.peer_unix_path = None;
560            record.listener_state = None;
561            record.connection_state = None;
562            record.state = SocketState::Bound;
563            record.clone()
564        };
565        register_bound_inet_socket(&mut table, cloned.spec, address, socket_id);
566        Ok(cloned)
567    }
568
569    pub fn set_datagram_socket_option(
570        &self,
571        socket_id: SocketId,
572        option: DatagramSocketOption,
573        enabled: bool,
574    ) -> SocketResult<SocketRecord> {
575        let mut table = lock_or_recover(&self.inner.state);
576        let record = table
577            .sockets
578            .get_mut(&socket_id)
579            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
580        let datagram_state = datagram_state_mut(record)?;
581
582        match option {
583            DatagramSocketOption::ReuseAddr => datagram_state.reuse_addr = enabled,
584            DatagramSocketOption::ReusePort => datagram_state.reuse_port = enabled,
585            DatagramSocketOption::Broadcast => datagram_state.broadcast = enabled,
586        }
587
588        Ok(record.clone())
589    }
590
591    pub fn add_multicast_membership(
592        &self,
593        socket_id: SocketId,
594        membership: SocketMulticastMembership,
595    ) -> SocketResult<SocketRecord> {
596        let mut table = lock_or_recover(&self.inner.state);
597        let normalized_membership = {
598            let record = table
599                .sockets
600                .get(&socket_id)
601                .ok_or_else(|| SocketTableError::not_found(socket_id))?;
602            validate_multicast_socket(record)?;
603            normalize_multicast_membership(record.spec, membership)?
604        };
605
606        let cloned = {
607            let record = table
608                .sockets
609                .get_mut(&socket_id)
610                .ok_or_else(|| SocketTableError::not_found(socket_id))?;
611            let datagram_state = datagram_state_mut(record)?;
612            datagram_state
613                .multicast_memberships
614                .insert(normalized_membership.clone());
615            record.clone()
616        };
617
618        table
619            .multicast_groups
620            .entry(normalized_membership)
621            .or_default()
622            .insert(socket_id);
623        Ok(cloned)
624    }
625
626    pub fn drop_multicast_membership(
627        &self,
628        socket_id: SocketId,
629        membership: SocketMulticastMembership,
630    ) -> SocketResult<SocketRecord> {
631        let mut table = lock_or_recover(&self.inner.state);
632        let normalized_membership = {
633            let record = table
634                .sockets
635                .get(&socket_id)
636                .ok_or_else(|| SocketTableError::not_found(socket_id))?;
637            validate_multicast_socket(record)?;
638            normalize_multicast_membership(record.spec, membership)?
639        };
640
641        let cloned = {
642            let record = table
643                .sockets
644                .get_mut(&socket_id)
645                .ok_or_else(|| SocketTableError::not_found(socket_id))?;
646            let datagram_state = datagram_state_mut(record)?;
647            if !datagram_state
648                .multicast_memberships
649                .remove(&normalized_membership)
650            {
651                return Err(SocketTableError::address_not_available(format!(
652                    "socket {socket_id} has not joined multicast group {}",
653                    normalized_membership.group_address()
654                )));
655            }
656            record.clone()
657        };
658
659        if let Some(members) = table.multicast_groups.get_mut(&normalized_membership) {
660            members.remove(&socket_id);
661            if members.is_empty() {
662                table.multicast_groups.remove(&normalized_membership);
663            }
664        }
665
666        Ok(cloned)
667    }
668
669    pub fn bind_unix(
670        &self,
671        socket_id: SocketId,
672        path: impl Into<String>,
673    ) -> SocketResult<SocketRecord> {
674        let path = normalize_unix_socket_path(path.into())?;
675        let mut table = lock_or_recover(&self.inner.state);
676        let existing = table
677            .sockets
678            .get(&socket_id)
679            .cloned()
680            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
681        if !supports_unix_stream_lifecycle(existing.spec) {
682            return Err(SocketTableError::invalid_argument(format!(
683                "socket {socket_id} is not a Unix stream socket"
684            )));
685        }
686        let existing_id = table.bound_unix_streams.get(&path).copied();
687        let cloned = {
688            let record = table
689                .sockets
690                .get_mut(&socket_id)
691                .ok_or_else(|| SocketTableError::not_found(socket_id))?;
692
693            if let Some(bound_socket_id) = existing_id {
694                if bound_socket_id != socket_id {
695                    return Err(SocketTableError::address_in_use(format!(
696                        "path {path} is already bound"
697                    )));
698                }
699            }
700
701            match record.state {
702                SocketState::Created => {}
703                SocketState::Bound if record.local_unix_path.as_deref() == Some(path.as_str()) => {
704                    return Ok(record.clone());
705                }
706                SocketState::Bound | SocketState::Listening | SocketState::Connected => {
707                    return Err(SocketTableError::invalid_argument(format!(
708                        "socket {socket_id} cannot bind in state {:?}",
709                        record.state
710                    )));
711                }
712            }
713
714            record.local_address = None;
715            record.peer_address = None;
716            record.local_unix_path = Some(path.clone());
717            record.peer_unix_path = None;
718            record.listener_state = None;
719            record.connection_state = None;
720            record.state = SocketState::Bound;
721            record.clone()
722        };
723        table.bound_unix_streams.insert(path, socket_id);
724        Ok(cloned)
725    }
726
727    pub fn listen(&self, socket_id: SocketId, backlog: usize) -> SocketResult<SocketRecord> {
728        if backlog == 0 {
729            return Err(SocketTableError::invalid_argument(
730                "listener backlog must be greater than zero",
731            ));
732        }
733
734        let mut table = lock_or_recover(&self.inner.state);
735        let record = table
736            .sockets
737            .get_mut(&socket_id)
738            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
739
740        if !supports_listener_lifecycle(record.spec) {
741            return Err(SocketTableError::invalid_argument(format!(
742                "socket {socket_id} is not a stream socket"
743            )));
744        }
745        if record.state != SocketState::Bound || !has_bound_endpoint(record) {
746            return Err(SocketTableError::invalid_argument(format!(
747                "socket {socket_id} must be bound before listen"
748            )));
749        }
750
751        record.state = SocketState::Listening;
752        record.listener_state = Some(ListenerState {
753            backlog,
754            pending_accepts: VecDeque::new(),
755        });
756        Ok(record.clone())
757    }
758
759    pub fn enqueue_incoming_tcp_connection(
760        &self,
761        listener_socket_id: SocketId,
762        peer_address: InetSocketAddress,
763    ) -> SocketResult<()> {
764        let mut table = lock_or_recover(&self.inner.state);
765        let record = table
766            .sockets
767            .get_mut(&listener_socket_id)
768            .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?;
769
770        if record.state != SocketState::Listening {
771            return Err(SocketTableError::invalid_argument(format!(
772                "socket {listener_socket_id} is not listening"
773            )));
774        }
775
776        let listener_state = record.listener_state.as_mut().ok_or_else(|| {
777            SocketTableError::invalid_argument(format!(
778                "socket {listener_socket_id} has no listener state"
779            ))
780        })?;
781
782        if listener_state.pending_accepts.len() >= listener_state.backlog {
783            return Err(SocketTableError::would_block(format!(
784                "listener {listener_socket_id} backlog is full"
785            )));
786        }
787
788        listener_state.pending_accepts.push_back(PendingConnection {
789            peer_address: Some(peer_address),
790            peer_unix_path: None,
791            accepted_socket_id: None,
792        });
793        Ok(())
794    }
795
796    pub fn accept(&self, listener_socket_id: SocketId) -> SocketResult<SocketRecord> {
797        let mut table = lock_or_recover(&self.inner.state);
798        let (owner_pid, spec, local_address, local_unix_path, pending) = {
799            let record = table
800                .sockets
801                .get_mut(&listener_socket_id)
802                .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?;
803
804            if record.state != SocketState::Listening {
805                return Err(SocketTableError::invalid_argument(format!(
806                    "socket {listener_socket_id} is not listening"
807                )));
808            }
809
810            let listener_state = record.listener_state.as_mut().ok_or_else(|| {
811                SocketTableError::invalid_argument(format!(
812                    "socket {listener_socket_id} has no listener state"
813                ))
814            })?;
815            let pending = listener_state.pending_accepts.pop_front().ok_or_else(|| {
816                SocketTableError::would_block(format!(
817                    "listener {listener_socket_id} has no pending connections"
818                ))
819            })?;
820
821            (
822                record.owner_pid,
823                record.spec,
824                record.local_address.clone(),
825                record.local_unix_path.clone(),
826                pending,
827            )
828        };
829
830        if let Some(accepted_socket_id) = pending.accepted_socket_id {
831            return table
832                .sockets
833                .get(&accepted_socket_id)
834                .cloned()
835                .ok_or_else(|| SocketTableError::not_found(accepted_socket_id));
836        }
837
838        let socket_id = next_socket_id(&mut table);
839        let record = SocketRecord {
840            id: socket_id,
841            owner_pid,
842            spec,
843            state: SocketState::Connected,
844            local_address,
845            peer_address: pending.peer_address,
846            local_unix_path,
847            peer_unix_path: pending.peer_unix_path,
848            listener_state: None,
849            connection_state: default_connection_state(spec, SocketState::Connected),
850            datagram_state: default_datagram_state(spec),
851        };
852        table.sockets.insert(socket_id, record.clone());
853        table
854            .by_owner
855            .entry(owner_pid)
856            .or_default()
857            .insert(socket_id);
858        Ok(record)
859    }
860
861    pub fn connect_pair(
862        &self,
863        socket_id: SocketId,
864        peer_socket_id: SocketId,
865    ) -> SocketResult<(SocketRecord, SocketRecord)> {
866        if socket_id == peer_socket_id {
867            return Err(SocketTableError::invalid_argument(
868                "socket cannot connect to itself",
869            ));
870        }
871
872        let mut table = lock_or_recover(&self.inner.state);
873        let mut socket = table
874            .sockets
875            .remove(&socket_id)
876            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
877        let Some(mut peer) = table.sockets.remove(&peer_socket_id) else {
878            table.sockets.insert(socket_id, socket);
879            return Err(SocketTableError::not_found(peer_socket_id));
880        };
881
882        if let Err(error) = validate_connect_pair(&socket, &peer) {
883            table.sockets.insert(socket_id, socket);
884            table.sockets.insert(peer_socket_id, peer);
885            return Err(error);
886        }
887
888        socket.state = SocketState::Connected;
889        socket.peer_address = peer.local_address.clone();
890        socket.peer_unix_path = peer.local_unix_path.clone();
891        socket.listener_state = None;
892        socket.connection_state = Some(ConnectionState {
893            peer_socket_id: Some(peer_socket_id),
894            ..ConnectionState::default()
895        });
896
897        peer.state = SocketState::Connected;
898        peer.peer_address = socket.local_address.clone();
899        peer.peer_unix_path = socket.local_unix_path.clone();
900        peer.listener_state = None;
901        peer.connection_state = Some(ConnectionState {
902            peer_socket_id: Some(socket_id),
903            ..ConnectionState::default()
904        });
905
906        let socket_clone = socket.clone();
907        let peer_clone = peer.clone();
908        table.sockets.insert(socket_id, socket);
909        table.sockets.insert(peer_socket_id, peer);
910        Ok((socket_clone, peer_clone))
911    }
912
913    pub fn find_bound_inet_socket(
914        &self,
915        spec: SocketSpec,
916        address: &InetSocketAddress,
917    ) -> Option<SocketRecord> {
918        let address = normalize_inet_address(address.clone());
919        let table = lock_or_recover(&self.inner.state);
920        let socket_id = lookup_bound_inet_socket(&table, spec, &address)?;
921        table.sockets.get(&socket_id).cloned()
922    }
923
924    pub fn connect_to_bound_inet_stream(
925        &self,
926        socket_id: SocketId,
927        target_address: InetSocketAddress,
928    ) -> SocketResult<()> {
929        let target_address = normalize_inet_address(target_address);
930        let mut table = lock_or_recover(&self.inner.state);
931        let listener_socket_id =
932            lookup_bound_inet_socket_in_table(&table.bound_inet_streams, &target_address)
933                .ok_or_else(|| {
934                    SocketTableError::not_found_address(format!(
935                        "no listening socket bound at {}:{}",
936                        target_address.host(),
937                        target_address.port()
938                    ))
939                })?;
940
941        if socket_id == listener_socket_id {
942            return Err(SocketTableError::invalid_argument(
943                "socket cannot connect to its own listening endpoint",
944            ));
945        }
946
947        let mut client = table
948            .sockets
949            .remove(&socket_id)
950            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
951        let accepted_socket_id = next_socket_id(&mut table);
952
953        let result = (|| {
954            let listener = table
955                .sockets
956                .get_mut(&listener_socket_id)
957                .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?;
958            validate_connect_to_listener(&client, listener)?;
959
960            let listener_state = listener.listener_state.as_mut().ok_or_else(|| {
961                SocketTableError::invalid_argument(format!(
962                    "socket {listener_socket_id} has no listener state"
963                ))
964            })?;
965            if listener_state.pending_accepts.len() >= listener_state.backlog {
966                return Err(SocketTableError::would_block(format!(
967                    "listener {listener_socket_id} backlog is full"
968                )));
969            }
970
971            let accepted = SocketRecord {
972                id: accepted_socket_id,
973                owner_pid: listener.owner_pid,
974                spec: listener.spec,
975                state: SocketState::Connected,
976                local_address: listener.local_address.clone(),
977                peer_address: client.local_address.clone(),
978                local_unix_path: None,
979                peer_unix_path: None,
980                listener_state: None,
981                connection_state: Some(ConnectionState {
982                    peer_socket_id: Some(socket_id),
983                    ..ConnectionState::default()
984                }),
985                datagram_state: default_datagram_state(listener.spec),
986            };
987
988            listener_state.pending_accepts.push_back(PendingConnection {
989                peer_address: client.local_address.clone(),
990                peer_unix_path: None,
991                accepted_socket_id: Some(accepted_socket_id),
992            });
993
994            client.state = SocketState::Connected;
995            client.peer_address = listener.local_address.clone();
996            client.peer_unix_path = None;
997            client.listener_state = None;
998            client.connection_state = Some(ConnectionState {
999                peer_socket_id: Some(accepted_socket_id),
1000                ..ConnectionState::default()
1001            });
1002
1003            Ok(accepted)
1004        })();
1005
1006        match result {
1007            Ok(accepted) => {
1008                table.sockets.insert(socket_id, client);
1009                table.sockets.insert(accepted_socket_id, accepted.clone());
1010                table
1011                    .by_owner
1012                    .entry(accepted.owner_pid)
1013                    .or_default()
1014                    .insert(accepted_socket_id);
1015                Ok(())
1016            }
1017            Err(error) => {
1018                table.sockets.insert(socket_id, client);
1019                Err(error)
1020            }
1021        }
1022    }
1023
1024    pub fn find_bound_unix_socket(&self, path: &str) -> Option<SocketRecord> {
1025        let path = normalize_unix_socket_path(path).ok()?;
1026        let table = lock_or_recover(&self.inner.state);
1027        let socket_id = table.bound_unix_streams.get(&path).copied()?;
1028        table.sockets.get(&socket_id).cloned()
1029    }
1030
1031    pub fn connect_to_bound_unix_stream(
1032        &self,
1033        socket_id: SocketId,
1034        target_path: impl Into<String>,
1035    ) -> SocketResult<()> {
1036        let target_path = normalize_unix_socket_path(target_path.into())?;
1037        let mut table = lock_or_recover(&self.inner.state);
1038        let listener_socket_id = table
1039            .bound_unix_streams
1040            .get(&target_path)
1041            .copied()
1042            .ok_or_else(|| {
1043                SocketTableError::not_found_address(format!(
1044                    "no listening socket bound at path {target_path}"
1045                ))
1046            })?;
1047
1048        if socket_id == listener_socket_id {
1049            return Err(SocketTableError::invalid_argument(
1050                "socket cannot connect to its own listening endpoint",
1051            ));
1052        }
1053
1054        let mut client = table
1055            .sockets
1056            .remove(&socket_id)
1057            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1058        let accepted_socket_id = next_socket_id(&mut table);
1059
1060        let result = (|| {
1061            let listener = table
1062                .sockets
1063                .get_mut(&listener_socket_id)
1064                .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?;
1065            validate_connect_to_listener(&client, listener)?;
1066
1067            let listener_state = listener.listener_state.as_mut().ok_or_else(|| {
1068                SocketTableError::invalid_argument(format!(
1069                    "socket {listener_socket_id} has no listener state"
1070                ))
1071            })?;
1072            if listener_state.pending_accepts.len() >= listener_state.backlog {
1073                return Err(SocketTableError::would_block(format!(
1074                    "listener {listener_socket_id} backlog is full"
1075                )));
1076            }
1077
1078            let accepted = SocketRecord {
1079                id: accepted_socket_id,
1080                owner_pid: listener.owner_pid,
1081                spec: listener.spec,
1082                state: SocketState::Connected,
1083                local_address: None,
1084                peer_address: None,
1085                local_unix_path: listener.local_unix_path.clone(),
1086                peer_unix_path: client.local_unix_path.clone(),
1087                listener_state: None,
1088                connection_state: Some(ConnectionState {
1089                    peer_socket_id: Some(socket_id),
1090                    ..ConnectionState::default()
1091                }),
1092                datagram_state: default_datagram_state(listener.spec),
1093            };
1094
1095            listener_state.pending_accepts.push_back(PendingConnection {
1096                peer_address: None,
1097                peer_unix_path: client.local_unix_path.clone(),
1098                accepted_socket_id: Some(accepted_socket_id),
1099            });
1100
1101            client.state = SocketState::Connected;
1102            client.peer_address = None;
1103            client.peer_unix_path = listener.local_unix_path.clone();
1104            client.listener_state = None;
1105            client.connection_state = Some(ConnectionState {
1106                peer_socket_id: Some(accepted_socket_id),
1107                ..ConnectionState::default()
1108            });
1109
1110            Ok(accepted)
1111        })();
1112
1113        match result {
1114            Ok(accepted) => {
1115                table.sockets.insert(socket_id, client);
1116                table.sockets.insert(accepted_socket_id, accepted.clone());
1117                table
1118                    .by_owner
1119                    .entry(accepted.owner_pid)
1120                    .or_default()
1121                    .insert(accepted_socket_id);
1122                Ok(())
1123            }
1124            Err(error) => {
1125                table.sockets.insert(socket_id, client);
1126                Err(error)
1127            }
1128        }
1129    }
1130
1131    pub fn send_to_bound_udp_socket(
1132        &self,
1133        socket_id: SocketId,
1134        target_address: InetSocketAddress,
1135        data: &[u8],
1136    ) -> SocketResult<usize> {
1137        let target_address = normalize_inet_address(target_address);
1138        let mut table = lock_or_recover(&self.inner.state);
1139        let sender = table
1140            .sockets
1141            .get(&socket_id)
1142            .cloned()
1143            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1144        validate_bound_udp_sender(&sender)?;
1145
1146        let receiver_socket_id = lookup_bound_inet_datagram_socket_in_table(
1147            &table.bound_inet_datagrams,
1148            &target_address,
1149        )
1150        .ok_or_else(|| {
1151            SocketTableError::not_found_address(format!(
1152                "no UDP socket bound at {}:{}",
1153                target_address.host(),
1154                target_address.port()
1155            ))
1156        })?;
1157        let receiver = table
1158            .sockets
1159            .get_mut(&receiver_socket_id)
1160            .ok_or_else(|| SocketTableError::not_found(receiver_socket_id))?;
1161        validate_bound_udp_receiver(receiver)?;
1162
1163        let datagram_state = receiver.datagram_state.as_mut().ok_or_else(|| {
1164            SocketTableError::invalid_argument(format!(
1165                "socket {receiver_socket_id} does not support datagrams"
1166            ))
1167        })?;
1168        datagram_state.recv_queue.push_back(QueuedDatagram {
1169            source_address: sender.local_address.clone(),
1170            payload: data.to_vec(),
1171        });
1172        Ok(data.len())
1173    }
1174
1175    pub fn check_send_to_bound_udp_socket(
1176        &self,
1177        socket_id: SocketId,
1178        target_address: InetSocketAddress,
1179    ) -> SocketResult<()> {
1180        let target_address = normalize_inet_address(target_address);
1181        let table = lock_or_recover(&self.inner.state);
1182        let sender = table
1183            .sockets
1184            .get(&socket_id)
1185            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1186        validate_bound_udp_sender(sender)?;
1187
1188        let receiver_socket_id = lookup_bound_inet_datagram_socket_in_table(
1189            &table.bound_inet_datagrams,
1190            &target_address,
1191        )
1192        .ok_or_else(|| {
1193            SocketTableError::not_found_address(format!(
1194                "no UDP socket bound at {}:{}",
1195                target_address.host(),
1196                target_address.port()
1197            ))
1198        })?;
1199        let receiver = table
1200            .sockets
1201            .get(&receiver_socket_id)
1202            .ok_or_else(|| SocketTableError::not_found(receiver_socket_id))?;
1203        validate_bound_udp_receiver(receiver)?;
1204        Ok(())
1205    }
1206
1207    pub fn recv_datagram(
1208        &self,
1209        socket_id: SocketId,
1210        max_bytes: usize,
1211    ) -> SocketResult<Option<ReceivedDatagram>> {
1212        let mut table = lock_or_recover(&self.inner.state);
1213        let record = table
1214            .sockets
1215            .get_mut(&socket_id)
1216            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1217        validate_bound_udp_receiver(record)?;
1218
1219        let datagram_state = record.datagram_state.as_mut().ok_or_else(|| {
1220            SocketTableError::invalid_argument(format!(
1221                "socket {socket_id} does not support datagrams"
1222            ))
1223        })?;
1224        let Some(datagram) = datagram_state.recv_queue.pop_front() else {
1225            return Err(SocketTableError::would_block(format!(
1226                "socket {socket_id} has no queued datagrams"
1227            )));
1228        };
1229
1230        let payload = if datagram.payload.len() > max_bytes {
1231            datagram.payload[..max_bytes].to_vec()
1232        } else {
1233            datagram.payload
1234        };
1235        Ok(Some(ReceivedDatagram {
1236            source_address: datagram.source_address,
1237            payload,
1238        }))
1239    }
1240
1241    pub fn poll(&self, socket_id: SocketId, requested: PollEvents) -> SocketResult<PollEvents> {
1242        let table = lock_or_recover(&self.inner.state);
1243        let record = table
1244            .sockets
1245            .get(&socket_id)
1246            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1247
1248        let mut events = PollEvents::empty();
1249        match record.state {
1250            SocketState::Listening => {
1251                if requested.intersects(POLLIN) && record.pending_accept_count() > 0 {
1252                    events |= POLLIN;
1253                }
1254            }
1255            SocketState::Connected => {
1256                let connection = record.connection_state.as_ref().ok_or_else(|| {
1257                    SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1258                })?;
1259                let peer = connection
1260                    .peer_socket_id
1261                    .and_then(|peer_socket_id| table.sockets.get(&peer_socket_id));
1262
1263                if requested.intersects(POLLIN) && !connection.recv_buffer.is_empty() {
1264                    events |= POLLIN;
1265                }
1266                if connection.peer_write_shutdown || peer.is_none() {
1267                    events |= POLLHUP;
1268                }
1269
1270                if requested.intersects(POLLOUT) && !connection.write_shutdown {
1271                    if peer
1272                        .and_then(|peer| peer.connection_state.as_ref())
1273                        .map(|peer_connection| peer_connection.read_shutdown)
1274                        .unwrap_or(true)
1275                    {
1276                        events |= POLLERR;
1277                    } else {
1278                        events |= POLLOUT;
1279                    }
1280                }
1281            }
1282            SocketState::Bound if supports_inet_datagram_lifecycle(record.spec) => {
1283                let datagram_state = record.datagram_state.as_ref().ok_or_else(|| {
1284                    SocketTableError::invalid_argument(format!(
1285                        "socket {socket_id} does not support datagrams"
1286                    ))
1287                })?;
1288                if requested.intersects(POLLIN) && !datagram_state.recv_queue.is_empty() {
1289                    events |= POLLIN;
1290                }
1291                if requested.intersects(POLLOUT) {
1292                    events |= POLLOUT;
1293                }
1294            }
1295            SocketState::Created | SocketState::Bound => {}
1296        }
1297
1298        Ok(events)
1299    }
1300
1301    pub fn write(&self, socket_id: SocketId, data: &[u8]) -> SocketResult<usize> {
1302        let mut table = lock_or_recover(&self.inner.state);
1303        let record = table
1304            .sockets
1305            .get(&socket_id)
1306            .cloned()
1307            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1308        let connection = record.connection_state.as_ref().ok_or_else(|| {
1309            SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1310        })?;
1311        if record.state != SocketState::Connected {
1312            return Err(SocketTableError::not_connected(format!(
1313                "socket {socket_id} is not connected"
1314            )));
1315        }
1316        if connection.write_shutdown {
1317            return Err(SocketTableError::broken_pipe(format!(
1318                "socket {socket_id} write side is shut down"
1319            )));
1320        }
1321
1322        let peer_socket_id = connection.peer_socket_id.ok_or_else(|| {
1323            SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1324        })?;
1325        let peer = table.sockets.get_mut(&peer_socket_id).ok_or_else(|| {
1326            SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1327        })?;
1328        let peer_connection = peer.connection_state.as_mut().ok_or_else(|| {
1329            SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1330        })?;
1331        if peer_connection.read_shutdown {
1332            return Err(SocketTableError::broken_pipe(format!(
1333                "socket {peer_socket_id} read side is shut down"
1334            )));
1335        }
1336
1337        peer_connection.recv_buffer.extend(data.iter().copied());
1338        Ok(data.len())
1339    }
1340
1341    pub fn check_write(&self, socket_id: SocketId) -> SocketResult<()> {
1342        let table = lock_or_recover(&self.inner.state);
1343        let record = table
1344            .sockets
1345            .get(&socket_id)
1346            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1347        let connection = record.connection_state.as_ref().ok_or_else(|| {
1348            SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1349        })?;
1350        if record.state != SocketState::Connected {
1351            return Err(SocketTableError::not_connected(format!(
1352                "socket {socket_id} is not connected"
1353            )));
1354        }
1355        if connection.write_shutdown {
1356            return Err(SocketTableError::broken_pipe(format!(
1357                "socket {socket_id} write side is shut down"
1358            )));
1359        }
1360
1361        let peer_socket_id = connection.peer_socket_id.ok_or_else(|| {
1362            SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1363        })?;
1364        let peer = table.sockets.get(&peer_socket_id).ok_or_else(|| {
1365            SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1366        })?;
1367        let peer_connection = peer.connection_state.as_ref().ok_or_else(|| {
1368            SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1369        })?;
1370        if peer_connection.read_shutdown {
1371            return Err(SocketTableError::broken_pipe(format!(
1372                "socket {peer_socket_id} read side is shut down"
1373            )));
1374        }
1375
1376        Ok(())
1377    }
1378
1379    pub fn read(&self, socket_id: SocketId, max_bytes: usize) -> SocketResult<Option<Vec<u8>>> {
1380        if max_bytes == 0 {
1381            return Ok(Some(Vec::new()));
1382        }
1383
1384        let mut table = lock_or_recover(&self.inner.state);
1385        let record = table
1386            .sockets
1387            .get(&socket_id)
1388            .cloned()
1389            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1390        if record.state != SocketState::Connected {
1391            return Err(SocketTableError::not_connected(format!(
1392                "socket {socket_id} is not connected"
1393            )));
1394        }
1395
1396        let connection = record.connection_state.as_ref().ok_or_else(|| {
1397            SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1398        })?;
1399        if connection.read_shutdown {
1400            return Ok(None);
1401        }
1402        if !connection.recv_buffer.is_empty() {
1403            let record = table
1404                .sockets
1405                .get_mut(&socket_id)
1406                .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1407            let connection = record.connection_state.as_mut().ok_or_else(|| {
1408                SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1409            })?;
1410            let read_len = connection.recv_buffer.len().min(max_bytes);
1411            let bytes = connection.recv_buffer.drain(..read_len).collect::<Vec<_>>();
1412            return Ok(Some(bytes));
1413        }
1414
1415        let peer_open = connection
1416            .peer_socket_id
1417            .map(|peer_socket_id| table.sockets.contains_key(&peer_socket_id))
1418            .unwrap_or(false);
1419        if connection.peer_write_shutdown || !peer_open {
1420            return Ok(None);
1421        }
1422
1423        Err(SocketTableError::would_block(format!(
1424            "socket {socket_id} has no readable data"
1425        )))
1426    }
1427
1428    pub fn shutdown(&self, socket_id: SocketId, how: SocketShutdown) -> SocketResult<SocketRecord> {
1429        let mut table = lock_or_recover(&self.inner.state);
1430        let record = table
1431            .sockets
1432            .remove(&socket_id)
1433            .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1434
1435        if record.state != SocketState::Connected {
1436            table.sockets.insert(socket_id, record);
1437            return Err(SocketTableError::not_connected(format!(
1438                "socket {socket_id} is not connected"
1439            )));
1440        }
1441
1442        let Some(mut connection) = record.connection_state.clone() else {
1443            table.sockets.insert(socket_id, record);
1444            return Err(SocketTableError::not_connected(format!(
1445                "socket {socket_id} is not connected"
1446            )));
1447        };
1448
1449        if matches!(how, SocketShutdown::Read | SocketShutdown::Both) {
1450            connection.recv_buffer.clear();
1451            connection.read_shutdown = true;
1452        }
1453        if matches!(how, SocketShutdown::Write | SocketShutdown::Both) {
1454            connection.write_shutdown = true;
1455            if let Some(peer_socket_id) = connection.peer_socket_id {
1456                if let Some(peer) = table.sockets.get_mut(&peer_socket_id) {
1457                    if let Some(peer_connection) = peer.connection_state.as_mut() {
1458                        peer_connection.peer_write_shutdown = true;
1459                    }
1460                }
1461            }
1462        }
1463
1464        let mut record = record;
1465        record.connection_state = Some(connection);
1466        let cloned = record.clone();
1467        table.sockets.insert(socket_id, record);
1468        Ok(cloned)
1469    }
1470
1471    pub fn remove(&self, socket_id: SocketId) -> SocketResult<SocketRecord> {
1472        let mut table = lock_or_recover(&self.inner.state);
1473        remove_socket(&mut table, socket_id).ok_or_else(|| SocketTableError::not_found(socket_id))
1474    }
1475
1476    pub fn remove_all_for_pid(&self, owner_pid: u32) -> Vec<SocketRecord> {
1477        let mut table = lock_or_recover(&self.inner.state);
1478        let Some(socket_ids) = table.by_owner.remove(&owner_pid) else {
1479            return Vec::new();
1480        };
1481
1482        socket_ids
1483            .into_iter()
1484            .filter_map(|socket_id| remove_socket(&mut table, socket_id))
1485            .collect()
1486    }
1487
1488    pub fn snapshot(&self) -> SocketTableSnapshot {
1489        let table = lock_or_recover(&self.inner.state);
1490        let mut snapshot = SocketTableSnapshot {
1491            sockets: table.sockets.len(),
1492            ..SocketTableSnapshot::default()
1493        };
1494        for record in table.sockets.values() {
1495            if record.state.counts_as_listener() {
1496                snapshot.listeners += 1;
1497            }
1498            if record.state.counts_as_connection() {
1499                snapshot.connections += 1;
1500            }
1501            if let Some(connection) = &record.connection_state {
1502                snapshot.buffered_bytes = snapshot
1503                    .buffered_bytes
1504                    .saturating_add(connection.recv_buffer.len());
1505            }
1506            if let Some(datagram_state) = &record.datagram_state {
1507                snapshot.datagram_queue_len = snapshot
1508                    .datagram_queue_len
1509                    .saturating_add(datagram_state.recv_queue.len());
1510                snapshot.buffered_bytes = snapshot
1511                    .buffered_bytes
1512                    .saturating_add(datagram_queue_bytes(&datagram_state.recv_queue));
1513            }
1514        }
1515        snapshot
1516    }
1517}
1518
1519fn datagram_queue_bytes(queue: &VecDeque<QueuedDatagram>) -> usize {
1520    queue
1521        .iter()
1522        .map(|datagram| datagram.payload.len())
1523        .sum::<usize>()
1524}
1525
1526fn next_socket_id(table: &mut SocketTableState) -> SocketId {
1527    if table.next_socket_id == 0 {
1528        table.next_socket_id = 1;
1529    }
1530    let socket_id = table.next_socket_id;
1531    table.next_socket_id = table.next_socket_id.saturating_add(1);
1532    socket_id
1533}
1534
1535fn validate_state_transition(current: SocketState, next: SocketState) -> SocketResult<()> {
1536    if current == SocketState::Connected && next != SocketState::Connected {
1537        return Err(SocketTableError::invalid_argument(format!(
1538            "invalid socket state transition from {current:?} to {next:?}"
1539        )));
1540    }
1541    Ok(())
1542}
1543
1544fn validate_connect_pair(socket: &SocketRecord, peer: &SocketRecord) -> SocketResult<()> {
1545    if !supports_connection_lifecycle(socket.spec) {
1546        return Err(SocketTableError::invalid_argument(format!(
1547            "socket {} does not support stream connections",
1548            socket.id
1549        )));
1550    }
1551    if !supports_connection_lifecycle(peer.spec) {
1552        return Err(SocketTableError::invalid_argument(format!(
1553            "socket {} does not support stream connections",
1554            peer.id
1555        )));
1556    }
1557    if !matches!(socket.state, SocketState::Created | SocketState::Bound) {
1558        return Err(SocketTableError::invalid_argument(format!(
1559            "socket {} cannot connect in state {:?}",
1560            socket.id, socket.state
1561        )));
1562    }
1563    if !matches!(peer.state, SocketState::Created | SocketState::Bound) {
1564        return Err(SocketTableError::invalid_argument(format!(
1565            "socket {} cannot connect in state {:?}",
1566            peer.id, peer.state
1567        )));
1568    }
1569    Ok(())
1570}
1571
1572fn default_connection_state(spec: SocketSpec, state: SocketState) -> Option<ConnectionState> {
1573    if state == SocketState::Connected && supports_connection_lifecycle(spec) {
1574        Some(ConnectionState::default())
1575    } else {
1576        None
1577    }
1578}
1579
1580fn default_datagram_state(spec: SocketSpec) -> Option<DatagramState> {
1581    if supports_inet_datagram_lifecycle(spec) {
1582        Some(DatagramState::default())
1583    } else {
1584        None
1585    }
1586}
1587
1588fn supports_connection_lifecycle(spec: SocketSpec) -> bool {
1589    matches!(spec.socket_type, SocketType::Stream)
1590}
1591
1592fn supports_listener_lifecycle(spec: SocketSpec) -> bool {
1593    matches!(spec.socket_type, SocketType::Stream)
1594        && matches!(
1595            spec.domain,
1596            SocketDomain::Inet | SocketDomain::Inet6 | SocketDomain::Unix
1597        )
1598}
1599
1600fn supports_inet_bind(spec: SocketSpec) -> bool {
1601    matches!(spec.domain, SocketDomain::Inet | SocketDomain::Inet6)
1602        && matches!(spec.socket_type, SocketType::Stream | SocketType::Datagram)
1603}
1604
1605fn supports_unix_stream_lifecycle(spec: SocketSpec) -> bool {
1606    matches!(spec.socket_type, SocketType::Stream) && matches!(spec.domain, SocketDomain::Unix)
1607}
1608
1609fn supports_inet_stream_lifecycle(spec: SocketSpec) -> bool {
1610    matches!(spec.socket_type, SocketType::Stream)
1611        && matches!(spec.domain, SocketDomain::Inet | SocketDomain::Inet6)
1612}
1613
1614fn supports_inet_datagram_lifecycle(spec: SocketSpec) -> bool {
1615    matches!(spec.socket_type, SocketType::Datagram)
1616        && matches!(spec.domain, SocketDomain::Inet | SocketDomain::Inet6)
1617}
1618
1619fn lookup_conflicting_bound_inet_socket_ids(
1620    table: &SocketTableState,
1621    spec: SocketSpec,
1622    address: &InetSocketAddress,
1623) -> Vec<SocketId> {
1624    if supports_inet_stream_lifecycle(spec) {
1625        table
1626            .bound_inet_streams
1627            .iter()
1628            .find_map(|(bound_address, socket_id)| {
1629                inet_stream_bind_addresses_overlap(bound_address, address).then_some(*socket_id)
1630            })
1631            .into_iter()
1632            .collect()
1633    } else if supports_inet_datagram_lifecycle(spec) {
1634        table
1635            .bound_inet_datagrams
1636            .iter()
1637            .filter(|(bound_address, _)| inet_stream_bind_addresses_overlap(bound_address, address))
1638            .flat_map(|(_, socket_ids)| socket_ids.iter().copied())
1639            .collect()
1640    } else {
1641        Vec::new()
1642    }
1643}
1644
1645fn lookup_bound_inet_socket(
1646    table: &SocketTableState,
1647    spec: SocketSpec,
1648    address: &InetSocketAddress,
1649) -> Option<SocketId> {
1650    if supports_inet_stream_lifecycle(spec) {
1651        lookup_bound_inet_socket_in_table(&table.bound_inet_streams, address)
1652    } else if supports_inet_datagram_lifecycle(spec) {
1653        lookup_bound_inet_datagram_socket_in_table(&table.bound_inet_datagrams, address)
1654    } else {
1655        None
1656    }
1657}
1658
1659fn inet_stream_bind_addresses_overlap(
1660    existing: &InetSocketAddress,
1661    requested: &InetSocketAddress,
1662) -> bool {
1663    if existing == requested {
1664        return true;
1665    }
1666
1667    wildcard_inet_address(existing).as_ref() == Some(requested)
1668        || wildcard_inet_address(requested).as_ref() == Some(existing)
1669}
1670
1671fn lookup_bound_inet_socket_in_table(
1672    sockets: &BTreeMap<InetSocketAddress, SocketId>,
1673    address: &InetSocketAddress,
1674) -> Option<SocketId> {
1675    sockets.get(address).copied().or_else(|| {
1676        wildcard_inet_address(address).and_then(|wildcard| sockets.get(&wildcard).copied())
1677    })
1678}
1679
1680fn lookup_bound_inet_datagram_socket_in_table(
1681    sockets: &BTreeMap<InetSocketAddress, BTreeSet<SocketId>>,
1682    address: &InetSocketAddress,
1683) -> Option<SocketId> {
1684    sockets
1685        .get(address)
1686        .and_then(|socket_ids| socket_ids.first().copied())
1687        .or_else(|| {
1688            wildcard_inet_address(address).and_then(|wildcard| {
1689                sockets
1690                    .get(&wildcard)
1691                    .and_then(|socket_ids| socket_ids.first().copied())
1692            })
1693        })
1694}
1695
1696fn register_bound_inet_socket(
1697    table: &mut SocketTableState,
1698    spec: SocketSpec,
1699    address: InetSocketAddress,
1700    socket_id: SocketId,
1701) {
1702    if supports_inet_stream_lifecycle(spec) {
1703        table.bound_inet_streams.insert(address, socket_id);
1704    } else if supports_inet_datagram_lifecycle(spec) {
1705        table
1706            .bound_inet_datagrams
1707            .entry(address)
1708            .or_default()
1709            .insert(socket_id);
1710    }
1711}
1712
1713fn validate_connect_to_listener(
1714    client: &SocketRecord,
1715    listener: &SocketRecord,
1716) -> SocketResult<()> {
1717    if !supports_connection_lifecycle(client.spec) {
1718        return Err(SocketTableError::invalid_argument(format!(
1719            "socket {} does not support stream connections",
1720            client.id
1721        )));
1722    }
1723    if !supports_listener_lifecycle(listener.spec) {
1724        return Err(SocketTableError::invalid_argument(format!(
1725            "socket {} is not a stream listener",
1726            listener.id
1727        )));
1728    }
1729    if !matches!(client.state, SocketState::Created | SocketState::Bound) {
1730        return Err(SocketTableError::invalid_argument(format!(
1731            "socket {} cannot connect in state {:?}",
1732            client.id, client.state
1733        )));
1734    }
1735    if listener.state != SocketState::Listening {
1736        return Err(SocketTableError::invalid_argument(format!(
1737            "socket {} is not listening",
1738            listener.id
1739        )));
1740    }
1741    Ok(())
1742}
1743
1744fn has_bound_endpoint(record: &SocketRecord) -> bool {
1745    record.local_address.is_some() || record.local_unix_path.is_some()
1746}
1747
1748fn validate_bound_udp_sender(sender: &SocketRecord) -> SocketResult<()> {
1749    if !supports_inet_datagram_lifecycle(sender.spec) {
1750        return Err(SocketTableError::invalid_argument(format!(
1751            "socket {} is not an INET datagram socket",
1752            sender.id
1753        )));
1754    }
1755    if sender.state != SocketState::Bound || sender.local_address.is_none() {
1756        return Err(SocketTableError::invalid_argument(format!(
1757            "socket {} must be bound before sending datagrams",
1758            sender.id
1759        )));
1760    }
1761    Ok(())
1762}
1763
1764fn validate_bound_udp_receiver(receiver: &SocketRecord) -> SocketResult<()> {
1765    if !supports_inet_datagram_lifecycle(receiver.spec) {
1766        return Err(SocketTableError::invalid_argument(format!(
1767            "socket {} is not an INET datagram socket",
1768            receiver.id
1769        )));
1770    }
1771    if receiver.state != SocketState::Bound || receiver.local_address.is_none() {
1772        return Err(SocketTableError::invalid_argument(format!(
1773            "socket {} must be bound to receive datagrams",
1774            receiver.id
1775        )));
1776    }
1777    Ok(())
1778}
1779
1780fn datagram_state_mut(record: &mut SocketRecord) -> SocketResult<&mut DatagramState> {
1781    if !supports_inet_datagram_lifecycle(record.spec) {
1782        return Err(SocketTableError::invalid_argument(format!(
1783            "socket {} is not an INET datagram socket",
1784            record.id
1785        )));
1786    }
1787    record.datagram_state.as_mut().ok_or_else(|| {
1788        SocketTableError::invalid_argument(format!(
1789            "socket {} does not support datagrams",
1790            record.id
1791        ))
1792    })
1793}
1794
1795fn validate_multicast_socket(record: &SocketRecord) -> SocketResult<()> {
1796    validate_bound_udp_receiver(record)?;
1797    if record.spec.domain != SocketDomain::Inet {
1798        return Err(SocketTableError::invalid_argument(format!(
1799            "socket {} multicast membership is only implemented for IPv4 datagrams",
1800            record.id
1801        )));
1802    }
1803    Ok(())
1804}
1805
1806fn normalize_multicast_membership(
1807    spec: SocketSpec,
1808    membership: SocketMulticastMembership,
1809) -> SocketResult<SocketMulticastMembership> {
1810    let group_address = membership.group_address.trim().to_ascii_lowercase();
1811    let interface_address = membership
1812        .interface_address
1813        .map(|value| value.trim().to_ascii_lowercase())
1814        .filter(|value| !value.is_empty());
1815
1816    match spec.domain {
1817        SocketDomain::Inet => {
1818            let parsed = group_address.parse::<Ipv4Addr>().map_err(|_| {
1819                SocketTableError::invalid_argument(format!(
1820                    "invalid IPv4 multicast address {group_address}"
1821                ))
1822            })?;
1823            if !parsed.is_multicast() {
1824                return Err(SocketTableError::invalid_argument(format!(
1825                    "address {group_address} is not an IPv4 multicast group"
1826                )));
1827            }
1828        }
1829        SocketDomain::Inet6 => {
1830            let parsed = group_address.parse::<Ipv6Addr>().map_err(|_| {
1831                SocketTableError::invalid_argument(format!(
1832                    "invalid IPv6 multicast address {group_address}"
1833                ))
1834            })?;
1835            if !parsed.is_multicast() {
1836                return Err(SocketTableError::invalid_argument(format!(
1837                    "address {group_address} is not an IPv6 multicast group"
1838                )));
1839            }
1840        }
1841        SocketDomain::Unix => {
1842            return Err(SocketTableError::invalid_argument(
1843                "unix sockets do not support multicast membership",
1844            ));
1845        }
1846    }
1847
1848    Ok(SocketMulticastMembership::new(
1849        group_address,
1850        interface_address,
1851    ))
1852}
1853
1854fn has_incompatible_inet_bind_conflict(
1855    table: &SocketTableState,
1856    record: &SocketRecord,
1857    conflicting_ids: &[SocketId],
1858) -> bool {
1859    conflicting_ids.iter().any(|conflicting_id| {
1860        if *conflicting_id == record.id {
1861            return false;
1862        }
1863
1864        let Some(existing) = table.sockets.get(conflicting_id) else {
1865            return false;
1866        };
1867
1868        if supports_inet_datagram_lifecycle(record.spec) {
1869            !inet_datagram_bind_shares_port(record, existing)
1870        } else {
1871            true
1872        }
1873    })
1874}
1875
1876fn inet_datagram_bind_shares_port(requested: &SocketRecord, existing: &SocketRecord) -> bool {
1877    (requested.reuse_port() && existing.reuse_port())
1878        || (requested.reuse_address() && existing.reuse_address())
1879}
1880
1881fn remove_socket(table: &mut SocketTableState, socket_id: SocketId) -> Option<SocketRecord> {
1882    let record = table.sockets.remove(&socket_id)?;
1883    unregister_bound_socket(table, &record);
1884    unregister_multicast_memberships(table, &record);
1885    if let Some(listener_state) = record.listener_state.as_ref() {
1886        let pending_socket_ids = listener_state
1887            .pending_accepts
1888            .iter()
1889            .filter_map(|pending| pending.accepted_socket_id)
1890            .collect::<Vec<_>>();
1891        for pending_socket_id in pending_socket_ids {
1892            let _ = remove_socket(table, pending_socket_id);
1893        }
1894    }
1895    if let Some(connection) = record.connection_state.as_ref() {
1896        if let Some(peer_socket_id) = connection.peer_socket_id {
1897            if let Some(peer) = table.sockets.get_mut(&peer_socket_id) {
1898                if let Some(peer_connection) = peer.connection_state.as_mut() {
1899                    if peer_connection.peer_socket_id == Some(socket_id) {
1900                        peer_connection.peer_socket_id = None;
1901                    }
1902                    peer_connection.peer_write_shutdown = true;
1903                }
1904            }
1905        }
1906    }
1907    if let Some(owner_sockets) = table.by_owner.get_mut(&record.owner_pid) {
1908        owner_sockets.remove(&socket_id);
1909        if owner_sockets.is_empty() {
1910            table.by_owner.remove(&record.owner_pid);
1911        }
1912    }
1913    Some(record)
1914}
1915
1916fn unregister_bound_socket(table: &mut SocketTableState, record: &SocketRecord) {
1917    let Some(address) = record.local_address.as_ref() else {
1918        if supports_unix_stream_lifecycle(record.spec) {
1919            if let Some(path) = record.local_unix_path.as_ref() {
1920                if table.bound_unix_streams.get(path).copied() == Some(record.id) {
1921                    table.bound_unix_streams.remove(path);
1922                }
1923            }
1924        }
1925        return;
1926    };
1927    if supports_inet_stream_lifecycle(record.spec)
1928        && table.bound_inet_streams.get(address).copied() == Some(record.id)
1929    {
1930        table.bound_inet_streams.remove(address);
1931    }
1932    if supports_inet_datagram_lifecycle(record.spec) {
1933        if let Some(socket_ids) = table.bound_inet_datagrams.get_mut(address) {
1934            socket_ids.remove(&record.id);
1935            if socket_ids.is_empty() {
1936                table.bound_inet_datagrams.remove(address);
1937            }
1938        }
1939    }
1940}
1941
1942fn unregister_multicast_memberships(table: &mut SocketTableState, record: &SocketRecord) {
1943    let Some(datagram_state) = record.datagram_state.as_ref() else {
1944        return;
1945    };
1946
1947    for membership in &datagram_state.multicast_memberships {
1948        if let Some(socket_ids) = table.multicast_groups.get_mut(membership) {
1949            socket_ids.remove(&record.id);
1950            if socket_ids.is_empty() {
1951                table.multicast_groups.remove(membership);
1952            }
1953        }
1954    }
1955}
1956
1957fn normalize_inet_address(address: InetSocketAddress) -> InetSocketAddress {
1958    match address.host().to_ascii_lowercase().as_str() {
1959        "localhost" => InetSocketAddress::new("127.0.0.1", address.port()),
1960        _ => address,
1961    }
1962}
1963
1964fn wildcard_inet_address(address: &InetSocketAddress) -> Option<InetSocketAddress> {
1965    match address.host() {
1966        "127.0.0.1" => Some(InetSocketAddress::new("0.0.0.0", address.port())),
1967        "::1" => Some(InetSocketAddress::new("::", address.port())),
1968        _ => None,
1969    }
1970}
1971
1972fn normalize_unix_socket_path(path: impl AsRef<str>) -> SocketResult<String> {
1973    let normalized = normalize_path(path.as_ref());
1974    if normalized == "/" {
1975        return Err(SocketTableError::invalid_argument(
1976            "unix socket path must not be empty or root",
1977        ));
1978    }
1979    Ok(normalized)
1980}
1981
1982fn lock_or_recover<'a, T>(mutex: &'a Mutex<T>) -> MutexGuard<'a, T> {
1983    match mutex.lock() {
1984        Ok(guard) => guard,
1985        Err(poisoned) => poisoned.into_inner(),
1986    }
1987}