1use crate::poll::{PollEvents, POLLERR, POLLHUP, POLLIN, POLLOUT};
2use crate::vfs::normalize_path;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::error::Error;
5use std::fmt;
6use std::net::{Ipv4Addr, Ipv6Addr};
7use std::sync::{Arc, Mutex, MutexGuard};
8
9pub type SocketId = u64;
10pub type SocketResult<T> = Result<T, SocketTableError>;
11
12#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
13pub struct InetSocketAddress {
14 host: String,
15 port: u16,
16}
17
18impl InetSocketAddress {
19 pub fn new(host: impl Into<String>, port: u16) -> Self {
20 Self {
21 host: host.into(),
22 port,
23 }
24 }
25
26 pub fn host(&self) -> &str {
27 &self.host
28 }
29
30 pub const fn port(&self) -> u16 {
31 self.port
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
36pub enum SocketDomain {
37 Inet,
38 Inet6,
39 Unix,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
43pub enum SocketType {
44 Stream,
45 Datagram,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
49pub enum SocketState {
50 Created,
51 Bound,
52 Listening,
53 Connected,
54}
55
56impl SocketState {
57 pub const fn counts_as_listener(self) -> bool {
58 matches!(self, Self::Listening)
59 }
60
61 pub const fn counts_as_connection(self) -> bool {
62 matches!(self, Self::Connected)
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum SocketShutdown {
68 Read,
69 Write,
70 Both,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum DatagramSocketOption {
75 ReuseAddr,
76 ReusePort,
77 Broadcast,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct SocketSpec {
82 pub domain: SocketDomain,
83 pub socket_type: SocketType,
84}
85
86impl SocketSpec {
87 pub const fn new(domain: SocketDomain, socket_type: SocketType) -> Self {
88 Self {
89 domain,
90 socket_type,
91 }
92 }
93
94 pub const fn tcp() -> Self {
95 Self::new(SocketDomain::Inet, SocketType::Stream)
96 }
97
98 pub const fn udp() -> Self {
99 Self::new(SocketDomain::Inet, SocketType::Datagram)
100 }
101
102 pub const fn unix_stream() -> Self {
103 Self::new(SocketDomain::Unix, SocketType::Stream)
104 }
105
106 pub const fn unix_datagram() -> Self {
107 Self::new(SocketDomain::Unix, SocketType::Datagram)
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct SocketRecord {
113 id: SocketId,
114 owner_pid: u32,
115 spec: SocketSpec,
116 state: SocketState,
117 local_address: Option<InetSocketAddress>,
118 peer_address: Option<InetSocketAddress>,
119 local_unix_path: Option<String>,
120 peer_unix_path: Option<String>,
121 listener_state: Option<ListenerState>,
122 connection_state: Option<ConnectionState>,
123 datagram_state: Option<DatagramState>,
124}
125
126impl SocketRecord {
127 pub const fn id(&self) -> SocketId {
128 self.id
129 }
130
131 pub const fn owner_pid(&self) -> u32 {
132 self.owner_pid
133 }
134
135 pub const fn spec(&self) -> SocketSpec {
136 self.spec
137 }
138
139 pub const fn state(&self) -> SocketState {
140 self.state
141 }
142
143 pub fn local_address(&self) -> Option<&InetSocketAddress> {
144 self.local_address.as_ref()
145 }
146
147 pub fn peer_address(&self) -> Option<&InetSocketAddress> {
148 self.peer_address.as_ref()
149 }
150
151 pub fn local_unix_path(&self) -> Option<&str> {
152 self.local_unix_path.as_deref()
153 }
154
155 pub fn peer_unix_path(&self) -> Option<&str> {
156 self.peer_unix_path.as_deref()
157 }
158
159 pub fn listen_backlog(&self) -> Option<usize> {
160 self.listener_state.as_ref().map(|state| state.backlog)
161 }
162
163 pub fn pending_accept_count(&self) -> usize {
164 self.listener_state
165 .as_ref()
166 .map(|state| state.pending_accepts.len())
167 .unwrap_or(0)
168 }
169
170 pub fn peer_socket_id(&self) -> Option<SocketId> {
171 self.connection_state
172 .as_ref()
173 .and_then(|state| state.peer_socket_id)
174 }
175
176 pub fn buffered_read_bytes(&self) -> usize {
177 self.connection_state
178 .as_ref()
179 .map(|state| state.recv_buffer.len())
180 .unwrap_or(0)
181 }
182
183 pub fn read_shutdown(&self) -> bool {
184 self.connection_state
185 .as_ref()
186 .map(|state| state.read_shutdown)
187 .unwrap_or(false)
188 }
189
190 pub fn write_shutdown(&self) -> bool {
191 self.connection_state
192 .as_ref()
193 .map(|state| state.write_shutdown)
194 .unwrap_or(false)
195 }
196
197 pub fn peer_write_shutdown(&self) -> bool {
198 self.connection_state
199 .as_ref()
200 .map(|state| state.peer_write_shutdown)
201 .unwrap_or(false)
202 }
203
204 pub fn queued_datagrams(&self) -> usize {
205 self.datagram_state
206 .as_ref()
207 .map(|state| state.recv_queue.len())
208 .unwrap_or(0)
209 }
210
211 pub fn queued_datagram_bytes(&self) -> usize {
212 self.datagram_state
213 .as_ref()
214 .map(|state| datagram_queue_bytes(&state.recv_queue))
215 .unwrap_or(0)
216 }
217
218 pub fn reuse_address(&self) -> bool {
219 self.datagram_state
220 .as_ref()
221 .map(|state| state.reuse_addr)
222 .unwrap_or(false)
223 }
224
225 pub fn reuse_port(&self) -> bool {
226 self.datagram_state
227 .as_ref()
228 .map(|state| state.reuse_port)
229 .unwrap_or(false)
230 }
231
232 pub fn broadcast_enabled(&self) -> bool {
233 self.datagram_state
234 .as_ref()
235 .map(|state| state.broadcast)
236 .unwrap_or(false)
237 }
238
239 pub fn multicast_membership_count(&self) -> usize {
240 self.datagram_state
241 .as_ref()
242 .map(|state| state.multicast_memberships.len())
243 .unwrap_or(0)
244 }
245
246 pub fn has_multicast_membership(&self, membership: &SocketMulticastMembership) -> bool {
247 self.datagram_state
248 .as_ref()
249 .map(|state| state.multicast_memberships.contains(membership))
250 .unwrap_or(false)
251 }
252}
253
254#[derive(Debug, Clone, PartialEq, Eq)]
255pub struct ReceivedDatagram {
256 source_address: Option<InetSocketAddress>,
257 payload: Vec<u8>,
258}
259
260impl ReceivedDatagram {
261 pub fn source_address(&self) -> Option<&InetSocketAddress> {
262 self.source_address.as_ref()
263 }
264
265 pub fn payload(&self) -> &[u8] {
266 &self.payload
267 }
268
269 pub fn into_parts(self) -> (Option<InetSocketAddress>, Vec<u8>) {
270 (self.source_address, self.payload)
271 }
272}
273
274#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
275pub struct SocketTableSnapshot {
276 pub sockets: usize,
277 pub listeners: usize,
278 pub connections: usize,
279 pub buffered_bytes: usize,
280 pub datagram_queue_len: usize,
281}
282
283#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
284pub struct SocketMulticastMembership {
285 group_address: String,
286 interface_address: Option<String>,
287}
288
289impl SocketMulticastMembership {
290 pub fn new(group_address: impl Into<String>, interface_address: Option<String>) -> Self {
291 Self {
292 group_address: group_address.into(),
293 interface_address,
294 }
295 }
296
297 pub fn group_address(&self) -> &str {
298 &self.group_address
299 }
300
301 pub fn interface_address(&self) -> Option<&str> {
302 self.interface_address.as_deref()
303 }
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct SocketTableError {
308 code: &'static str,
309 message: String,
310}
311
312impl SocketTableError {
313 pub fn code(&self) -> &'static str {
314 self.code
315 }
316
317 fn not_found(socket_id: SocketId) -> Self {
318 Self {
319 code: "ENOENT",
320 message: format!("no such socket {socket_id}"),
321 }
322 }
323
324 fn invalid_argument(message: impl Into<String>) -> Self {
325 Self {
326 code: "EINVAL",
327 message: message.into(),
328 }
329 }
330
331 fn address_in_use(message: impl Into<String>) -> Self {
332 Self {
333 code: "EADDRINUSE",
334 message: message.into(),
335 }
336 }
337
338 fn address_not_available(message: impl Into<String>) -> Self {
339 Self {
340 code: "EADDRNOTAVAIL",
341 message: message.into(),
342 }
343 }
344
345 fn not_found_address(message: impl Into<String>) -> Self {
346 Self {
347 code: "ECONNREFUSED",
348 message: message.into(),
349 }
350 }
351
352 fn would_block(message: impl Into<String>) -> Self {
353 Self {
354 code: "EAGAIN",
355 message: message.into(),
356 }
357 }
358
359 fn not_connected(message: impl Into<String>) -> Self {
360 Self {
361 code: "ENOTCONN",
362 message: message.into(),
363 }
364 }
365
366 fn broken_pipe(message: impl Into<String>) -> Self {
367 Self {
368 code: "EPIPE",
369 message: message.into(),
370 }
371 }
372}
373
374impl fmt::Display for SocketTableError {
375 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376 write!(f, "{}: {}", self.code, self.message)
377 }
378}
379
380impl Error for SocketTableError {}
381
382#[derive(Debug, Default)]
383struct SocketTableState {
384 sockets: BTreeMap<SocketId, SocketRecord>,
385 by_owner: BTreeMap<u32, BTreeSet<SocketId>>,
386 bound_inet_streams: BTreeMap<InetSocketAddress, SocketId>,
387 bound_inet_datagrams: BTreeMap<InetSocketAddress, BTreeSet<SocketId>>,
388 bound_unix_streams: BTreeMap<String, SocketId>,
389 multicast_groups: BTreeMap<SocketMulticastMembership, BTreeSet<SocketId>>,
390 next_socket_id: SocketId,
391}
392
393#[derive(Debug, Clone, PartialEq, Eq)]
394struct ListenerState {
395 backlog: usize,
396 pending_accepts: VecDeque<PendingConnection>,
397}
398
399#[derive(Debug, Clone, PartialEq, Eq, Default)]
400struct ConnectionState {
401 peer_socket_id: Option<SocketId>,
402 recv_buffer: VecDeque<u8>,
403 read_shutdown: bool,
404 write_shutdown: bool,
405 peer_write_shutdown: bool,
406}
407
408#[derive(Debug, Clone, PartialEq, Eq)]
409struct PendingConnection {
410 peer_address: Option<InetSocketAddress>,
411 peer_unix_path: Option<String>,
412 accepted_socket_id: Option<SocketId>,
413}
414
415#[derive(Debug, Clone, PartialEq, Eq, Default)]
416struct DatagramState {
417 recv_queue: VecDeque<QueuedDatagram>,
418 reuse_addr: bool,
419 reuse_port: bool,
420 broadcast: bool,
421 multicast_memberships: BTreeSet<SocketMulticastMembership>,
422}
423
424#[derive(Debug, Clone, PartialEq, Eq)]
425struct QueuedDatagram {
426 source_address: Option<InetSocketAddress>,
427 payload: Vec<u8>,
428}
429
430#[derive(Debug, Default)]
431struct SocketTableInner {
432 state: Mutex<SocketTableState>,
433}
434
435#[derive(Debug, Clone, Default)]
436pub struct SocketTable {
437 inner: Arc<SocketTableInner>,
438}
439
440impl SocketTable {
441 pub fn new() -> Self {
442 Self::default()
443 }
444
445 pub fn allocate(&self, owner_pid: u32, spec: SocketSpec) -> SocketRecord {
446 self.allocate_with_state(owner_pid, spec, SocketState::Created)
447 }
448
449 pub fn allocate_with_state(
450 &self,
451 owner_pid: u32,
452 spec: SocketSpec,
453 state: SocketState,
454 ) -> SocketRecord {
455 let mut table = lock_or_recover(&self.inner.state);
456 let socket_id = next_socket_id(&mut table);
457 let record = SocketRecord {
458 id: socket_id,
459 owner_pid,
460 spec,
461 state,
462 local_address: None,
463 peer_address: None,
464 local_unix_path: None,
465 peer_unix_path: None,
466 listener_state: None,
467 connection_state: default_connection_state(spec, state),
468 datagram_state: default_datagram_state(spec),
469 };
470 table.sockets.insert(socket_id, record.clone());
471 table
472 .by_owner
473 .entry(owner_pid)
474 .or_default()
475 .insert(socket_id);
476 record
477 }
478
479 pub fn get(&self, socket_id: SocketId) -> Option<SocketRecord> {
480 lock_or_recover(&self.inner.state)
481 .sockets
482 .get(&socket_id)
483 .cloned()
484 }
485
486 pub fn update_state(
487 &self,
488 socket_id: SocketId,
489 new_state: SocketState,
490 ) -> SocketResult<SocketRecord> {
491 let mut table = lock_or_recover(&self.inner.state);
492 let record = table
493 .sockets
494 .get_mut(&socket_id)
495 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
496 validate_state_transition(record.state, new_state)?;
497 record.state = new_state;
498 if new_state != SocketState::Listening {
499 record.listener_state = None;
500 }
501 if new_state == SocketState::Connected && supports_connection_lifecycle(record.spec) {
502 record
503 .connection_state
504 .get_or_insert_with(ConnectionState::default);
505 } else if new_state != SocketState::Connected {
506 record.connection_state = None;
507 }
508 Ok(record.clone())
509 }
510
511 pub fn bind_inet(
512 &self,
513 socket_id: SocketId,
514 address: InetSocketAddress,
515 ) -> SocketResult<SocketRecord> {
516 let address = normalize_inet_address(address);
517 let mut table = lock_or_recover(&self.inner.state);
518 let existing = table
519 .sockets
520 .get(&socket_id)
521 .cloned()
522 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
523 if !supports_inet_bind(existing.spec) {
524 return Err(SocketTableError::invalid_argument(format!(
525 "socket {socket_id} is not an INET socket"
526 )));
527 }
528 let conflicting_ids =
529 lookup_conflicting_bound_inet_socket_ids(&table, existing.spec, &address);
530 if has_incompatible_inet_bind_conflict(&table, &existing, &conflicting_ids) {
531 return Err(SocketTableError::address_in_use(format!(
532 "address {}:{} is already bound",
533 address.host(),
534 address.port()
535 )));
536 }
537 let cloned = {
538 let record = table
539 .sockets
540 .get_mut(&socket_id)
541 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
542
543 match record.state {
544 SocketState::Created => {}
545 SocketState::Bound if record.local_address.as_ref() == Some(&address) => {
546 return Ok(record.clone());
547 }
548 SocketState::Bound | SocketState::Listening | SocketState::Connected => {
549 return Err(SocketTableError::invalid_argument(format!(
550 "socket {socket_id} cannot bind in state {:?}",
551 record.state
552 )));
553 }
554 }
555
556 record.local_address = Some(address.clone());
557 record.peer_address = None;
558 record.local_unix_path = None;
559 record.peer_unix_path = None;
560 record.listener_state = None;
561 record.connection_state = None;
562 record.state = SocketState::Bound;
563 record.clone()
564 };
565 register_bound_inet_socket(&mut table, cloned.spec, address, socket_id);
566 Ok(cloned)
567 }
568
569 pub fn set_datagram_socket_option(
570 &self,
571 socket_id: SocketId,
572 option: DatagramSocketOption,
573 enabled: bool,
574 ) -> SocketResult<SocketRecord> {
575 let mut table = lock_or_recover(&self.inner.state);
576 let record = table
577 .sockets
578 .get_mut(&socket_id)
579 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
580 let datagram_state = datagram_state_mut(record)?;
581
582 match option {
583 DatagramSocketOption::ReuseAddr => datagram_state.reuse_addr = enabled,
584 DatagramSocketOption::ReusePort => datagram_state.reuse_port = enabled,
585 DatagramSocketOption::Broadcast => datagram_state.broadcast = enabled,
586 }
587
588 Ok(record.clone())
589 }
590
591 pub fn add_multicast_membership(
592 &self,
593 socket_id: SocketId,
594 membership: SocketMulticastMembership,
595 ) -> SocketResult<SocketRecord> {
596 let mut table = lock_or_recover(&self.inner.state);
597 let normalized_membership = {
598 let record = table
599 .sockets
600 .get(&socket_id)
601 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
602 validate_multicast_socket(record)?;
603 normalize_multicast_membership(record.spec, membership)?
604 };
605
606 let cloned = {
607 let record = table
608 .sockets
609 .get_mut(&socket_id)
610 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
611 let datagram_state = datagram_state_mut(record)?;
612 datagram_state
613 .multicast_memberships
614 .insert(normalized_membership.clone());
615 record.clone()
616 };
617
618 table
619 .multicast_groups
620 .entry(normalized_membership)
621 .or_default()
622 .insert(socket_id);
623 Ok(cloned)
624 }
625
626 pub fn drop_multicast_membership(
627 &self,
628 socket_id: SocketId,
629 membership: SocketMulticastMembership,
630 ) -> SocketResult<SocketRecord> {
631 let mut table = lock_or_recover(&self.inner.state);
632 let normalized_membership = {
633 let record = table
634 .sockets
635 .get(&socket_id)
636 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
637 validate_multicast_socket(record)?;
638 normalize_multicast_membership(record.spec, membership)?
639 };
640
641 let cloned = {
642 let record = table
643 .sockets
644 .get_mut(&socket_id)
645 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
646 let datagram_state = datagram_state_mut(record)?;
647 if !datagram_state
648 .multicast_memberships
649 .remove(&normalized_membership)
650 {
651 return Err(SocketTableError::address_not_available(format!(
652 "socket {socket_id} has not joined multicast group {}",
653 normalized_membership.group_address()
654 )));
655 }
656 record.clone()
657 };
658
659 if let Some(members) = table.multicast_groups.get_mut(&normalized_membership) {
660 members.remove(&socket_id);
661 if members.is_empty() {
662 table.multicast_groups.remove(&normalized_membership);
663 }
664 }
665
666 Ok(cloned)
667 }
668
669 pub fn bind_unix(
670 &self,
671 socket_id: SocketId,
672 path: impl Into<String>,
673 ) -> SocketResult<SocketRecord> {
674 let path = normalize_unix_socket_path(path.into())?;
675 let mut table = lock_or_recover(&self.inner.state);
676 let existing = table
677 .sockets
678 .get(&socket_id)
679 .cloned()
680 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
681 if !supports_unix_stream_lifecycle(existing.spec) {
682 return Err(SocketTableError::invalid_argument(format!(
683 "socket {socket_id} is not a Unix stream socket"
684 )));
685 }
686 let existing_id = table.bound_unix_streams.get(&path).copied();
687 let cloned = {
688 let record = table
689 .sockets
690 .get_mut(&socket_id)
691 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
692
693 if let Some(bound_socket_id) = existing_id {
694 if bound_socket_id != socket_id {
695 return Err(SocketTableError::address_in_use(format!(
696 "path {path} is already bound"
697 )));
698 }
699 }
700
701 match record.state {
702 SocketState::Created => {}
703 SocketState::Bound if record.local_unix_path.as_deref() == Some(path.as_str()) => {
704 return Ok(record.clone());
705 }
706 SocketState::Bound | SocketState::Listening | SocketState::Connected => {
707 return Err(SocketTableError::invalid_argument(format!(
708 "socket {socket_id} cannot bind in state {:?}",
709 record.state
710 )));
711 }
712 }
713
714 record.local_address = None;
715 record.peer_address = None;
716 record.local_unix_path = Some(path.clone());
717 record.peer_unix_path = None;
718 record.listener_state = None;
719 record.connection_state = None;
720 record.state = SocketState::Bound;
721 record.clone()
722 };
723 table.bound_unix_streams.insert(path, socket_id);
724 Ok(cloned)
725 }
726
727 pub fn listen(&self, socket_id: SocketId, backlog: usize) -> SocketResult<SocketRecord> {
728 if backlog == 0 {
729 return Err(SocketTableError::invalid_argument(
730 "listener backlog must be greater than zero",
731 ));
732 }
733
734 let mut table = lock_or_recover(&self.inner.state);
735 let record = table
736 .sockets
737 .get_mut(&socket_id)
738 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
739
740 if !supports_listener_lifecycle(record.spec) {
741 return Err(SocketTableError::invalid_argument(format!(
742 "socket {socket_id} is not a stream socket"
743 )));
744 }
745 if record.state != SocketState::Bound || !has_bound_endpoint(record) {
746 return Err(SocketTableError::invalid_argument(format!(
747 "socket {socket_id} must be bound before listen"
748 )));
749 }
750
751 record.state = SocketState::Listening;
752 record.listener_state = Some(ListenerState {
753 backlog,
754 pending_accepts: VecDeque::new(),
755 });
756 Ok(record.clone())
757 }
758
759 pub fn enqueue_incoming_tcp_connection(
760 &self,
761 listener_socket_id: SocketId,
762 peer_address: InetSocketAddress,
763 ) -> SocketResult<()> {
764 let mut table = lock_or_recover(&self.inner.state);
765 let record = table
766 .sockets
767 .get_mut(&listener_socket_id)
768 .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?;
769
770 if record.state != SocketState::Listening {
771 return Err(SocketTableError::invalid_argument(format!(
772 "socket {listener_socket_id} is not listening"
773 )));
774 }
775
776 let listener_state = record.listener_state.as_mut().ok_or_else(|| {
777 SocketTableError::invalid_argument(format!(
778 "socket {listener_socket_id} has no listener state"
779 ))
780 })?;
781
782 if listener_state.pending_accepts.len() >= listener_state.backlog {
783 return Err(SocketTableError::would_block(format!(
784 "listener {listener_socket_id} backlog is full"
785 )));
786 }
787
788 listener_state.pending_accepts.push_back(PendingConnection {
789 peer_address: Some(peer_address),
790 peer_unix_path: None,
791 accepted_socket_id: None,
792 });
793 Ok(())
794 }
795
796 pub fn accept(&self, listener_socket_id: SocketId) -> SocketResult<SocketRecord> {
797 let mut table = lock_or_recover(&self.inner.state);
798 let (owner_pid, spec, local_address, local_unix_path, pending) = {
799 let record = table
800 .sockets
801 .get_mut(&listener_socket_id)
802 .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?;
803
804 if record.state != SocketState::Listening {
805 return Err(SocketTableError::invalid_argument(format!(
806 "socket {listener_socket_id} is not listening"
807 )));
808 }
809
810 let listener_state = record.listener_state.as_mut().ok_or_else(|| {
811 SocketTableError::invalid_argument(format!(
812 "socket {listener_socket_id} has no listener state"
813 ))
814 })?;
815 let pending = listener_state.pending_accepts.pop_front().ok_or_else(|| {
816 SocketTableError::would_block(format!(
817 "listener {listener_socket_id} has no pending connections"
818 ))
819 })?;
820
821 (
822 record.owner_pid,
823 record.spec,
824 record.local_address.clone(),
825 record.local_unix_path.clone(),
826 pending,
827 )
828 };
829
830 if let Some(accepted_socket_id) = pending.accepted_socket_id {
831 return table
832 .sockets
833 .get(&accepted_socket_id)
834 .cloned()
835 .ok_or_else(|| SocketTableError::not_found(accepted_socket_id));
836 }
837
838 let socket_id = next_socket_id(&mut table);
839 let record = SocketRecord {
840 id: socket_id,
841 owner_pid,
842 spec,
843 state: SocketState::Connected,
844 local_address,
845 peer_address: pending.peer_address,
846 local_unix_path,
847 peer_unix_path: pending.peer_unix_path,
848 listener_state: None,
849 connection_state: default_connection_state(spec, SocketState::Connected),
850 datagram_state: default_datagram_state(spec),
851 };
852 table.sockets.insert(socket_id, record.clone());
853 table
854 .by_owner
855 .entry(owner_pid)
856 .or_default()
857 .insert(socket_id);
858 Ok(record)
859 }
860
861 pub fn connect_pair(
862 &self,
863 socket_id: SocketId,
864 peer_socket_id: SocketId,
865 ) -> SocketResult<(SocketRecord, SocketRecord)> {
866 if socket_id == peer_socket_id {
867 return Err(SocketTableError::invalid_argument(
868 "socket cannot connect to itself",
869 ));
870 }
871
872 let mut table = lock_or_recover(&self.inner.state);
873 let mut socket = table
874 .sockets
875 .remove(&socket_id)
876 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
877 let Some(mut peer) = table.sockets.remove(&peer_socket_id) else {
878 table.sockets.insert(socket_id, socket);
879 return Err(SocketTableError::not_found(peer_socket_id));
880 };
881
882 if let Err(error) = validate_connect_pair(&socket, &peer) {
883 table.sockets.insert(socket_id, socket);
884 table.sockets.insert(peer_socket_id, peer);
885 return Err(error);
886 }
887
888 socket.state = SocketState::Connected;
889 socket.peer_address = peer.local_address.clone();
890 socket.peer_unix_path = peer.local_unix_path.clone();
891 socket.listener_state = None;
892 socket.connection_state = Some(ConnectionState {
893 peer_socket_id: Some(peer_socket_id),
894 ..ConnectionState::default()
895 });
896
897 peer.state = SocketState::Connected;
898 peer.peer_address = socket.local_address.clone();
899 peer.peer_unix_path = socket.local_unix_path.clone();
900 peer.listener_state = None;
901 peer.connection_state = Some(ConnectionState {
902 peer_socket_id: Some(socket_id),
903 ..ConnectionState::default()
904 });
905
906 let socket_clone = socket.clone();
907 let peer_clone = peer.clone();
908 table.sockets.insert(socket_id, socket);
909 table.sockets.insert(peer_socket_id, peer);
910 Ok((socket_clone, peer_clone))
911 }
912
913 pub fn find_bound_inet_socket(
914 &self,
915 spec: SocketSpec,
916 address: &InetSocketAddress,
917 ) -> Option<SocketRecord> {
918 let address = normalize_inet_address(address.clone());
919 let table = lock_or_recover(&self.inner.state);
920 let socket_id = lookup_bound_inet_socket(&table, spec, &address)?;
921 table.sockets.get(&socket_id).cloned()
922 }
923
924 pub fn connect_to_bound_inet_stream(
925 &self,
926 socket_id: SocketId,
927 target_address: InetSocketAddress,
928 ) -> SocketResult<()> {
929 let target_address = normalize_inet_address(target_address);
930 let mut table = lock_or_recover(&self.inner.state);
931 let listener_socket_id =
932 lookup_bound_inet_socket_in_table(&table.bound_inet_streams, &target_address)
933 .ok_or_else(|| {
934 SocketTableError::not_found_address(format!(
935 "no listening socket bound at {}:{}",
936 target_address.host(),
937 target_address.port()
938 ))
939 })?;
940
941 if socket_id == listener_socket_id {
942 return Err(SocketTableError::invalid_argument(
943 "socket cannot connect to its own listening endpoint",
944 ));
945 }
946
947 let mut client = table
948 .sockets
949 .remove(&socket_id)
950 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
951 let accepted_socket_id = next_socket_id(&mut table);
952
953 let result = (|| {
954 let listener = table
955 .sockets
956 .get_mut(&listener_socket_id)
957 .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?;
958 validate_connect_to_listener(&client, listener)?;
959
960 let listener_state = listener.listener_state.as_mut().ok_or_else(|| {
961 SocketTableError::invalid_argument(format!(
962 "socket {listener_socket_id} has no listener state"
963 ))
964 })?;
965 if listener_state.pending_accepts.len() >= listener_state.backlog {
966 return Err(SocketTableError::would_block(format!(
967 "listener {listener_socket_id} backlog is full"
968 )));
969 }
970
971 let accepted = SocketRecord {
972 id: accepted_socket_id,
973 owner_pid: listener.owner_pid,
974 spec: listener.spec,
975 state: SocketState::Connected,
976 local_address: listener.local_address.clone(),
977 peer_address: client.local_address.clone(),
978 local_unix_path: None,
979 peer_unix_path: None,
980 listener_state: None,
981 connection_state: Some(ConnectionState {
982 peer_socket_id: Some(socket_id),
983 ..ConnectionState::default()
984 }),
985 datagram_state: default_datagram_state(listener.spec),
986 };
987
988 listener_state.pending_accepts.push_back(PendingConnection {
989 peer_address: client.local_address.clone(),
990 peer_unix_path: None,
991 accepted_socket_id: Some(accepted_socket_id),
992 });
993
994 client.state = SocketState::Connected;
995 client.peer_address = listener.local_address.clone();
996 client.peer_unix_path = None;
997 client.listener_state = None;
998 client.connection_state = Some(ConnectionState {
999 peer_socket_id: Some(accepted_socket_id),
1000 ..ConnectionState::default()
1001 });
1002
1003 Ok(accepted)
1004 })();
1005
1006 match result {
1007 Ok(accepted) => {
1008 table.sockets.insert(socket_id, client);
1009 table.sockets.insert(accepted_socket_id, accepted.clone());
1010 table
1011 .by_owner
1012 .entry(accepted.owner_pid)
1013 .or_default()
1014 .insert(accepted_socket_id);
1015 Ok(())
1016 }
1017 Err(error) => {
1018 table.sockets.insert(socket_id, client);
1019 Err(error)
1020 }
1021 }
1022 }
1023
1024 pub fn find_bound_unix_socket(&self, path: &str) -> Option<SocketRecord> {
1025 let path = normalize_unix_socket_path(path).ok()?;
1026 let table = lock_or_recover(&self.inner.state);
1027 let socket_id = table.bound_unix_streams.get(&path).copied()?;
1028 table.sockets.get(&socket_id).cloned()
1029 }
1030
1031 pub fn connect_to_bound_unix_stream(
1032 &self,
1033 socket_id: SocketId,
1034 target_path: impl Into<String>,
1035 ) -> SocketResult<()> {
1036 let target_path = normalize_unix_socket_path(target_path.into())?;
1037 let mut table = lock_or_recover(&self.inner.state);
1038 let listener_socket_id = table
1039 .bound_unix_streams
1040 .get(&target_path)
1041 .copied()
1042 .ok_or_else(|| {
1043 SocketTableError::not_found_address(format!(
1044 "no listening socket bound at path {target_path}"
1045 ))
1046 })?;
1047
1048 if socket_id == listener_socket_id {
1049 return Err(SocketTableError::invalid_argument(
1050 "socket cannot connect to its own listening endpoint",
1051 ));
1052 }
1053
1054 let mut client = table
1055 .sockets
1056 .remove(&socket_id)
1057 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1058 let accepted_socket_id = next_socket_id(&mut table);
1059
1060 let result = (|| {
1061 let listener = table
1062 .sockets
1063 .get_mut(&listener_socket_id)
1064 .ok_or_else(|| SocketTableError::not_found(listener_socket_id))?;
1065 validate_connect_to_listener(&client, listener)?;
1066
1067 let listener_state = listener.listener_state.as_mut().ok_or_else(|| {
1068 SocketTableError::invalid_argument(format!(
1069 "socket {listener_socket_id} has no listener state"
1070 ))
1071 })?;
1072 if listener_state.pending_accepts.len() >= listener_state.backlog {
1073 return Err(SocketTableError::would_block(format!(
1074 "listener {listener_socket_id} backlog is full"
1075 )));
1076 }
1077
1078 let accepted = SocketRecord {
1079 id: accepted_socket_id,
1080 owner_pid: listener.owner_pid,
1081 spec: listener.spec,
1082 state: SocketState::Connected,
1083 local_address: None,
1084 peer_address: None,
1085 local_unix_path: listener.local_unix_path.clone(),
1086 peer_unix_path: client.local_unix_path.clone(),
1087 listener_state: None,
1088 connection_state: Some(ConnectionState {
1089 peer_socket_id: Some(socket_id),
1090 ..ConnectionState::default()
1091 }),
1092 datagram_state: default_datagram_state(listener.spec),
1093 };
1094
1095 listener_state.pending_accepts.push_back(PendingConnection {
1096 peer_address: None,
1097 peer_unix_path: client.local_unix_path.clone(),
1098 accepted_socket_id: Some(accepted_socket_id),
1099 });
1100
1101 client.state = SocketState::Connected;
1102 client.peer_address = None;
1103 client.peer_unix_path = listener.local_unix_path.clone();
1104 client.listener_state = None;
1105 client.connection_state = Some(ConnectionState {
1106 peer_socket_id: Some(accepted_socket_id),
1107 ..ConnectionState::default()
1108 });
1109
1110 Ok(accepted)
1111 })();
1112
1113 match result {
1114 Ok(accepted) => {
1115 table.sockets.insert(socket_id, client);
1116 table.sockets.insert(accepted_socket_id, accepted.clone());
1117 table
1118 .by_owner
1119 .entry(accepted.owner_pid)
1120 .or_default()
1121 .insert(accepted_socket_id);
1122 Ok(())
1123 }
1124 Err(error) => {
1125 table.sockets.insert(socket_id, client);
1126 Err(error)
1127 }
1128 }
1129 }
1130
1131 pub fn send_to_bound_udp_socket(
1132 &self,
1133 socket_id: SocketId,
1134 target_address: InetSocketAddress,
1135 data: &[u8],
1136 ) -> SocketResult<usize> {
1137 let target_address = normalize_inet_address(target_address);
1138 let mut table = lock_or_recover(&self.inner.state);
1139 let sender = table
1140 .sockets
1141 .get(&socket_id)
1142 .cloned()
1143 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1144 validate_bound_udp_sender(&sender)?;
1145
1146 let receiver_socket_id = lookup_bound_inet_datagram_socket_in_table(
1147 &table.bound_inet_datagrams,
1148 &target_address,
1149 )
1150 .ok_or_else(|| {
1151 SocketTableError::not_found_address(format!(
1152 "no UDP socket bound at {}:{}",
1153 target_address.host(),
1154 target_address.port()
1155 ))
1156 })?;
1157 let receiver = table
1158 .sockets
1159 .get_mut(&receiver_socket_id)
1160 .ok_or_else(|| SocketTableError::not_found(receiver_socket_id))?;
1161 validate_bound_udp_receiver(receiver)?;
1162
1163 let datagram_state = receiver.datagram_state.as_mut().ok_or_else(|| {
1164 SocketTableError::invalid_argument(format!(
1165 "socket {receiver_socket_id} does not support datagrams"
1166 ))
1167 })?;
1168 datagram_state.recv_queue.push_back(QueuedDatagram {
1169 source_address: sender.local_address.clone(),
1170 payload: data.to_vec(),
1171 });
1172 Ok(data.len())
1173 }
1174
1175 pub fn check_send_to_bound_udp_socket(
1176 &self,
1177 socket_id: SocketId,
1178 target_address: InetSocketAddress,
1179 ) -> SocketResult<()> {
1180 let target_address = normalize_inet_address(target_address);
1181 let table = lock_or_recover(&self.inner.state);
1182 let sender = table
1183 .sockets
1184 .get(&socket_id)
1185 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1186 validate_bound_udp_sender(sender)?;
1187
1188 let receiver_socket_id = lookup_bound_inet_datagram_socket_in_table(
1189 &table.bound_inet_datagrams,
1190 &target_address,
1191 )
1192 .ok_or_else(|| {
1193 SocketTableError::not_found_address(format!(
1194 "no UDP socket bound at {}:{}",
1195 target_address.host(),
1196 target_address.port()
1197 ))
1198 })?;
1199 let receiver = table
1200 .sockets
1201 .get(&receiver_socket_id)
1202 .ok_or_else(|| SocketTableError::not_found(receiver_socket_id))?;
1203 validate_bound_udp_receiver(receiver)?;
1204 Ok(())
1205 }
1206
1207 pub fn recv_datagram(
1208 &self,
1209 socket_id: SocketId,
1210 max_bytes: usize,
1211 ) -> SocketResult<Option<ReceivedDatagram>> {
1212 let mut table = lock_or_recover(&self.inner.state);
1213 let record = table
1214 .sockets
1215 .get_mut(&socket_id)
1216 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1217 validate_bound_udp_receiver(record)?;
1218
1219 let datagram_state = record.datagram_state.as_mut().ok_or_else(|| {
1220 SocketTableError::invalid_argument(format!(
1221 "socket {socket_id} does not support datagrams"
1222 ))
1223 })?;
1224 let Some(datagram) = datagram_state.recv_queue.pop_front() else {
1225 return Err(SocketTableError::would_block(format!(
1226 "socket {socket_id} has no queued datagrams"
1227 )));
1228 };
1229
1230 let payload = if datagram.payload.len() > max_bytes {
1231 datagram.payload[..max_bytes].to_vec()
1232 } else {
1233 datagram.payload
1234 };
1235 Ok(Some(ReceivedDatagram {
1236 source_address: datagram.source_address,
1237 payload,
1238 }))
1239 }
1240
1241 pub fn poll(&self, socket_id: SocketId, requested: PollEvents) -> SocketResult<PollEvents> {
1242 let table = lock_or_recover(&self.inner.state);
1243 let record = table
1244 .sockets
1245 .get(&socket_id)
1246 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1247
1248 let mut events = PollEvents::empty();
1249 match record.state {
1250 SocketState::Listening => {
1251 if requested.intersects(POLLIN) && record.pending_accept_count() > 0 {
1252 events |= POLLIN;
1253 }
1254 }
1255 SocketState::Connected => {
1256 let connection = record.connection_state.as_ref().ok_or_else(|| {
1257 SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1258 })?;
1259 let peer = connection
1260 .peer_socket_id
1261 .and_then(|peer_socket_id| table.sockets.get(&peer_socket_id));
1262
1263 if requested.intersects(POLLIN) && !connection.recv_buffer.is_empty() {
1264 events |= POLLIN;
1265 }
1266 if connection.peer_write_shutdown || peer.is_none() {
1267 events |= POLLHUP;
1268 }
1269
1270 if requested.intersects(POLLOUT) && !connection.write_shutdown {
1271 if peer
1272 .and_then(|peer| peer.connection_state.as_ref())
1273 .map(|peer_connection| peer_connection.read_shutdown)
1274 .unwrap_or(true)
1275 {
1276 events |= POLLERR;
1277 } else {
1278 events |= POLLOUT;
1279 }
1280 }
1281 }
1282 SocketState::Bound if supports_inet_datagram_lifecycle(record.spec) => {
1283 let datagram_state = record.datagram_state.as_ref().ok_or_else(|| {
1284 SocketTableError::invalid_argument(format!(
1285 "socket {socket_id} does not support datagrams"
1286 ))
1287 })?;
1288 if requested.intersects(POLLIN) && !datagram_state.recv_queue.is_empty() {
1289 events |= POLLIN;
1290 }
1291 if requested.intersects(POLLOUT) {
1292 events |= POLLOUT;
1293 }
1294 }
1295 SocketState::Created | SocketState::Bound => {}
1296 }
1297
1298 Ok(events)
1299 }
1300
1301 pub fn write(&self, socket_id: SocketId, data: &[u8]) -> SocketResult<usize> {
1302 let mut table = lock_or_recover(&self.inner.state);
1303 let record = table
1304 .sockets
1305 .get(&socket_id)
1306 .cloned()
1307 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1308 let connection = record.connection_state.as_ref().ok_or_else(|| {
1309 SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1310 })?;
1311 if record.state != SocketState::Connected {
1312 return Err(SocketTableError::not_connected(format!(
1313 "socket {socket_id} is not connected"
1314 )));
1315 }
1316 if connection.write_shutdown {
1317 return Err(SocketTableError::broken_pipe(format!(
1318 "socket {socket_id} write side is shut down"
1319 )));
1320 }
1321
1322 let peer_socket_id = connection.peer_socket_id.ok_or_else(|| {
1323 SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1324 })?;
1325 let peer = table.sockets.get_mut(&peer_socket_id).ok_or_else(|| {
1326 SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1327 })?;
1328 let peer_connection = peer.connection_state.as_mut().ok_or_else(|| {
1329 SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1330 })?;
1331 if peer_connection.read_shutdown {
1332 return Err(SocketTableError::broken_pipe(format!(
1333 "socket {peer_socket_id} read side is shut down"
1334 )));
1335 }
1336
1337 peer_connection.recv_buffer.extend(data.iter().copied());
1338 Ok(data.len())
1339 }
1340
1341 pub fn check_write(&self, socket_id: SocketId) -> SocketResult<()> {
1342 let table = lock_or_recover(&self.inner.state);
1343 let record = table
1344 .sockets
1345 .get(&socket_id)
1346 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1347 let connection = record.connection_state.as_ref().ok_or_else(|| {
1348 SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1349 })?;
1350 if record.state != SocketState::Connected {
1351 return Err(SocketTableError::not_connected(format!(
1352 "socket {socket_id} is not connected"
1353 )));
1354 }
1355 if connection.write_shutdown {
1356 return Err(SocketTableError::broken_pipe(format!(
1357 "socket {socket_id} write side is shut down"
1358 )));
1359 }
1360
1361 let peer_socket_id = connection.peer_socket_id.ok_or_else(|| {
1362 SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1363 })?;
1364 let peer = table.sockets.get(&peer_socket_id).ok_or_else(|| {
1365 SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1366 })?;
1367 let peer_connection = peer.connection_state.as_ref().ok_or_else(|| {
1368 SocketTableError::broken_pipe(format!("socket {socket_id} peer is closed"))
1369 })?;
1370 if peer_connection.read_shutdown {
1371 return Err(SocketTableError::broken_pipe(format!(
1372 "socket {peer_socket_id} read side is shut down"
1373 )));
1374 }
1375
1376 Ok(())
1377 }
1378
1379 pub fn read(&self, socket_id: SocketId, max_bytes: usize) -> SocketResult<Option<Vec<u8>>> {
1380 if max_bytes == 0 {
1381 return Ok(Some(Vec::new()));
1382 }
1383
1384 let mut table = lock_or_recover(&self.inner.state);
1385 let record = table
1386 .sockets
1387 .get(&socket_id)
1388 .cloned()
1389 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1390 if record.state != SocketState::Connected {
1391 return Err(SocketTableError::not_connected(format!(
1392 "socket {socket_id} is not connected"
1393 )));
1394 }
1395
1396 let connection = record.connection_state.as_ref().ok_or_else(|| {
1397 SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1398 })?;
1399 if connection.read_shutdown {
1400 return Ok(None);
1401 }
1402 if !connection.recv_buffer.is_empty() {
1403 let record = table
1404 .sockets
1405 .get_mut(&socket_id)
1406 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1407 let connection = record.connection_state.as_mut().ok_or_else(|| {
1408 SocketTableError::not_connected(format!("socket {socket_id} is not connected"))
1409 })?;
1410 let read_len = connection.recv_buffer.len().min(max_bytes);
1411 let bytes = connection.recv_buffer.drain(..read_len).collect::<Vec<_>>();
1412 return Ok(Some(bytes));
1413 }
1414
1415 let peer_open = connection
1416 .peer_socket_id
1417 .map(|peer_socket_id| table.sockets.contains_key(&peer_socket_id))
1418 .unwrap_or(false);
1419 if connection.peer_write_shutdown || !peer_open {
1420 return Ok(None);
1421 }
1422
1423 Err(SocketTableError::would_block(format!(
1424 "socket {socket_id} has no readable data"
1425 )))
1426 }
1427
1428 pub fn shutdown(&self, socket_id: SocketId, how: SocketShutdown) -> SocketResult<SocketRecord> {
1429 let mut table = lock_or_recover(&self.inner.state);
1430 let record = table
1431 .sockets
1432 .remove(&socket_id)
1433 .ok_or_else(|| SocketTableError::not_found(socket_id))?;
1434
1435 if record.state != SocketState::Connected {
1436 table.sockets.insert(socket_id, record);
1437 return Err(SocketTableError::not_connected(format!(
1438 "socket {socket_id} is not connected"
1439 )));
1440 }
1441
1442 let Some(mut connection) = record.connection_state.clone() else {
1443 table.sockets.insert(socket_id, record);
1444 return Err(SocketTableError::not_connected(format!(
1445 "socket {socket_id} is not connected"
1446 )));
1447 };
1448
1449 if matches!(how, SocketShutdown::Read | SocketShutdown::Both) {
1450 connection.recv_buffer.clear();
1451 connection.read_shutdown = true;
1452 }
1453 if matches!(how, SocketShutdown::Write | SocketShutdown::Both) {
1454 connection.write_shutdown = true;
1455 if let Some(peer_socket_id) = connection.peer_socket_id {
1456 if let Some(peer) = table.sockets.get_mut(&peer_socket_id) {
1457 if let Some(peer_connection) = peer.connection_state.as_mut() {
1458 peer_connection.peer_write_shutdown = true;
1459 }
1460 }
1461 }
1462 }
1463
1464 let mut record = record;
1465 record.connection_state = Some(connection);
1466 let cloned = record.clone();
1467 table.sockets.insert(socket_id, record);
1468 Ok(cloned)
1469 }
1470
1471 pub fn remove(&self, socket_id: SocketId) -> SocketResult<SocketRecord> {
1472 let mut table = lock_or_recover(&self.inner.state);
1473 remove_socket(&mut table, socket_id).ok_or_else(|| SocketTableError::not_found(socket_id))
1474 }
1475
1476 pub fn remove_all_for_pid(&self, owner_pid: u32) -> Vec<SocketRecord> {
1477 let mut table = lock_or_recover(&self.inner.state);
1478 let Some(socket_ids) = table.by_owner.remove(&owner_pid) else {
1479 return Vec::new();
1480 };
1481
1482 socket_ids
1483 .into_iter()
1484 .filter_map(|socket_id| remove_socket(&mut table, socket_id))
1485 .collect()
1486 }
1487
1488 pub fn snapshot(&self) -> SocketTableSnapshot {
1489 let table = lock_or_recover(&self.inner.state);
1490 let mut snapshot = SocketTableSnapshot {
1491 sockets: table.sockets.len(),
1492 ..SocketTableSnapshot::default()
1493 };
1494 for record in table.sockets.values() {
1495 if record.state.counts_as_listener() {
1496 snapshot.listeners += 1;
1497 }
1498 if record.state.counts_as_connection() {
1499 snapshot.connections += 1;
1500 }
1501 if let Some(connection) = &record.connection_state {
1502 snapshot.buffered_bytes = snapshot
1503 .buffered_bytes
1504 .saturating_add(connection.recv_buffer.len());
1505 }
1506 if let Some(datagram_state) = &record.datagram_state {
1507 snapshot.datagram_queue_len = snapshot
1508 .datagram_queue_len
1509 .saturating_add(datagram_state.recv_queue.len());
1510 snapshot.buffered_bytes = snapshot
1511 .buffered_bytes
1512 .saturating_add(datagram_queue_bytes(&datagram_state.recv_queue));
1513 }
1514 }
1515 snapshot
1516 }
1517}
1518
1519fn datagram_queue_bytes(queue: &VecDeque<QueuedDatagram>) -> usize {
1520 queue
1521 .iter()
1522 .map(|datagram| datagram.payload.len())
1523 .sum::<usize>()
1524}
1525
1526fn next_socket_id(table: &mut SocketTableState) -> SocketId {
1527 if table.next_socket_id == 0 {
1528 table.next_socket_id = 1;
1529 }
1530 let socket_id = table.next_socket_id;
1531 table.next_socket_id = table.next_socket_id.saturating_add(1);
1532 socket_id
1533}
1534
1535fn validate_state_transition(current: SocketState, next: SocketState) -> SocketResult<()> {
1536 if current == SocketState::Connected && next != SocketState::Connected {
1537 return Err(SocketTableError::invalid_argument(format!(
1538 "invalid socket state transition from {current:?} to {next:?}"
1539 )));
1540 }
1541 Ok(())
1542}
1543
1544fn validate_connect_pair(socket: &SocketRecord, peer: &SocketRecord) -> SocketResult<()> {
1545 if !supports_connection_lifecycle(socket.spec) {
1546 return Err(SocketTableError::invalid_argument(format!(
1547 "socket {} does not support stream connections",
1548 socket.id
1549 )));
1550 }
1551 if !supports_connection_lifecycle(peer.spec) {
1552 return Err(SocketTableError::invalid_argument(format!(
1553 "socket {} does not support stream connections",
1554 peer.id
1555 )));
1556 }
1557 if !matches!(socket.state, SocketState::Created | SocketState::Bound) {
1558 return Err(SocketTableError::invalid_argument(format!(
1559 "socket {} cannot connect in state {:?}",
1560 socket.id, socket.state
1561 )));
1562 }
1563 if !matches!(peer.state, SocketState::Created | SocketState::Bound) {
1564 return Err(SocketTableError::invalid_argument(format!(
1565 "socket {} cannot connect in state {:?}",
1566 peer.id, peer.state
1567 )));
1568 }
1569 Ok(())
1570}
1571
1572fn default_connection_state(spec: SocketSpec, state: SocketState) -> Option<ConnectionState> {
1573 if state == SocketState::Connected && supports_connection_lifecycle(spec) {
1574 Some(ConnectionState::default())
1575 } else {
1576 None
1577 }
1578}
1579
1580fn default_datagram_state(spec: SocketSpec) -> Option<DatagramState> {
1581 if supports_inet_datagram_lifecycle(spec) {
1582 Some(DatagramState::default())
1583 } else {
1584 None
1585 }
1586}
1587
1588fn supports_connection_lifecycle(spec: SocketSpec) -> bool {
1589 matches!(spec.socket_type, SocketType::Stream)
1590}
1591
1592fn supports_listener_lifecycle(spec: SocketSpec) -> bool {
1593 matches!(spec.socket_type, SocketType::Stream)
1594 && matches!(
1595 spec.domain,
1596 SocketDomain::Inet | SocketDomain::Inet6 | SocketDomain::Unix
1597 )
1598}
1599
1600fn supports_inet_bind(spec: SocketSpec) -> bool {
1601 matches!(spec.domain, SocketDomain::Inet | SocketDomain::Inet6)
1602 && matches!(spec.socket_type, SocketType::Stream | SocketType::Datagram)
1603}
1604
1605fn supports_unix_stream_lifecycle(spec: SocketSpec) -> bool {
1606 matches!(spec.socket_type, SocketType::Stream) && matches!(spec.domain, SocketDomain::Unix)
1607}
1608
1609fn supports_inet_stream_lifecycle(spec: SocketSpec) -> bool {
1610 matches!(spec.socket_type, SocketType::Stream)
1611 && matches!(spec.domain, SocketDomain::Inet | SocketDomain::Inet6)
1612}
1613
1614fn supports_inet_datagram_lifecycle(spec: SocketSpec) -> bool {
1615 matches!(spec.socket_type, SocketType::Datagram)
1616 && matches!(spec.domain, SocketDomain::Inet | SocketDomain::Inet6)
1617}
1618
1619fn lookup_conflicting_bound_inet_socket_ids(
1620 table: &SocketTableState,
1621 spec: SocketSpec,
1622 address: &InetSocketAddress,
1623) -> Vec<SocketId> {
1624 if supports_inet_stream_lifecycle(spec) {
1625 table
1626 .bound_inet_streams
1627 .iter()
1628 .find_map(|(bound_address, socket_id)| {
1629 inet_stream_bind_addresses_overlap(bound_address, address).then_some(*socket_id)
1630 })
1631 .into_iter()
1632 .collect()
1633 } else if supports_inet_datagram_lifecycle(spec) {
1634 table
1635 .bound_inet_datagrams
1636 .iter()
1637 .filter(|(bound_address, _)| inet_stream_bind_addresses_overlap(bound_address, address))
1638 .flat_map(|(_, socket_ids)| socket_ids.iter().copied())
1639 .collect()
1640 } else {
1641 Vec::new()
1642 }
1643}
1644
1645fn lookup_bound_inet_socket(
1646 table: &SocketTableState,
1647 spec: SocketSpec,
1648 address: &InetSocketAddress,
1649) -> Option<SocketId> {
1650 if supports_inet_stream_lifecycle(spec) {
1651 lookup_bound_inet_socket_in_table(&table.bound_inet_streams, address)
1652 } else if supports_inet_datagram_lifecycle(spec) {
1653 lookup_bound_inet_datagram_socket_in_table(&table.bound_inet_datagrams, address)
1654 } else {
1655 None
1656 }
1657}
1658
1659fn inet_stream_bind_addresses_overlap(
1660 existing: &InetSocketAddress,
1661 requested: &InetSocketAddress,
1662) -> bool {
1663 if existing == requested {
1664 return true;
1665 }
1666
1667 wildcard_inet_address(existing).as_ref() == Some(requested)
1668 || wildcard_inet_address(requested).as_ref() == Some(existing)
1669}
1670
1671fn lookup_bound_inet_socket_in_table(
1672 sockets: &BTreeMap<InetSocketAddress, SocketId>,
1673 address: &InetSocketAddress,
1674) -> Option<SocketId> {
1675 sockets.get(address).copied().or_else(|| {
1676 wildcard_inet_address(address).and_then(|wildcard| sockets.get(&wildcard).copied())
1677 })
1678}
1679
1680fn lookup_bound_inet_datagram_socket_in_table(
1681 sockets: &BTreeMap<InetSocketAddress, BTreeSet<SocketId>>,
1682 address: &InetSocketAddress,
1683) -> Option<SocketId> {
1684 sockets
1685 .get(address)
1686 .and_then(|socket_ids| socket_ids.first().copied())
1687 .or_else(|| {
1688 wildcard_inet_address(address).and_then(|wildcard| {
1689 sockets
1690 .get(&wildcard)
1691 .and_then(|socket_ids| socket_ids.first().copied())
1692 })
1693 })
1694}
1695
1696fn register_bound_inet_socket(
1697 table: &mut SocketTableState,
1698 spec: SocketSpec,
1699 address: InetSocketAddress,
1700 socket_id: SocketId,
1701) {
1702 if supports_inet_stream_lifecycle(spec) {
1703 table.bound_inet_streams.insert(address, socket_id);
1704 } else if supports_inet_datagram_lifecycle(spec) {
1705 table
1706 .bound_inet_datagrams
1707 .entry(address)
1708 .or_default()
1709 .insert(socket_id);
1710 }
1711}
1712
1713fn validate_connect_to_listener(
1714 client: &SocketRecord,
1715 listener: &SocketRecord,
1716) -> SocketResult<()> {
1717 if !supports_connection_lifecycle(client.spec) {
1718 return Err(SocketTableError::invalid_argument(format!(
1719 "socket {} does not support stream connections",
1720 client.id
1721 )));
1722 }
1723 if !supports_listener_lifecycle(listener.spec) {
1724 return Err(SocketTableError::invalid_argument(format!(
1725 "socket {} is not a stream listener",
1726 listener.id
1727 )));
1728 }
1729 if !matches!(client.state, SocketState::Created | SocketState::Bound) {
1730 return Err(SocketTableError::invalid_argument(format!(
1731 "socket {} cannot connect in state {:?}",
1732 client.id, client.state
1733 )));
1734 }
1735 if listener.state != SocketState::Listening {
1736 return Err(SocketTableError::invalid_argument(format!(
1737 "socket {} is not listening",
1738 listener.id
1739 )));
1740 }
1741 Ok(())
1742}
1743
1744fn has_bound_endpoint(record: &SocketRecord) -> bool {
1745 record.local_address.is_some() || record.local_unix_path.is_some()
1746}
1747
1748fn validate_bound_udp_sender(sender: &SocketRecord) -> SocketResult<()> {
1749 if !supports_inet_datagram_lifecycle(sender.spec) {
1750 return Err(SocketTableError::invalid_argument(format!(
1751 "socket {} is not an INET datagram socket",
1752 sender.id
1753 )));
1754 }
1755 if sender.state != SocketState::Bound || sender.local_address.is_none() {
1756 return Err(SocketTableError::invalid_argument(format!(
1757 "socket {} must be bound before sending datagrams",
1758 sender.id
1759 )));
1760 }
1761 Ok(())
1762}
1763
1764fn validate_bound_udp_receiver(receiver: &SocketRecord) -> SocketResult<()> {
1765 if !supports_inet_datagram_lifecycle(receiver.spec) {
1766 return Err(SocketTableError::invalid_argument(format!(
1767 "socket {} is not an INET datagram socket",
1768 receiver.id
1769 )));
1770 }
1771 if receiver.state != SocketState::Bound || receiver.local_address.is_none() {
1772 return Err(SocketTableError::invalid_argument(format!(
1773 "socket {} must be bound to receive datagrams",
1774 receiver.id
1775 )));
1776 }
1777 Ok(())
1778}
1779
1780fn datagram_state_mut(record: &mut SocketRecord) -> SocketResult<&mut DatagramState> {
1781 if !supports_inet_datagram_lifecycle(record.spec) {
1782 return Err(SocketTableError::invalid_argument(format!(
1783 "socket {} is not an INET datagram socket",
1784 record.id
1785 )));
1786 }
1787 record.datagram_state.as_mut().ok_or_else(|| {
1788 SocketTableError::invalid_argument(format!(
1789 "socket {} does not support datagrams",
1790 record.id
1791 ))
1792 })
1793}
1794
1795fn validate_multicast_socket(record: &SocketRecord) -> SocketResult<()> {
1796 validate_bound_udp_receiver(record)?;
1797 if record.spec.domain != SocketDomain::Inet {
1798 return Err(SocketTableError::invalid_argument(format!(
1799 "socket {} multicast membership is only implemented for IPv4 datagrams",
1800 record.id
1801 )));
1802 }
1803 Ok(())
1804}
1805
1806fn normalize_multicast_membership(
1807 spec: SocketSpec,
1808 membership: SocketMulticastMembership,
1809) -> SocketResult<SocketMulticastMembership> {
1810 let group_address = membership.group_address.trim().to_ascii_lowercase();
1811 let interface_address = membership
1812 .interface_address
1813 .map(|value| value.trim().to_ascii_lowercase())
1814 .filter(|value| !value.is_empty());
1815
1816 match spec.domain {
1817 SocketDomain::Inet => {
1818 let parsed = group_address.parse::<Ipv4Addr>().map_err(|_| {
1819 SocketTableError::invalid_argument(format!(
1820 "invalid IPv4 multicast address {group_address}"
1821 ))
1822 })?;
1823 if !parsed.is_multicast() {
1824 return Err(SocketTableError::invalid_argument(format!(
1825 "address {group_address} is not an IPv4 multicast group"
1826 )));
1827 }
1828 }
1829 SocketDomain::Inet6 => {
1830 let parsed = group_address.parse::<Ipv6Addr>().map_err(|_| {
1831 SocketTableError::invalid_argument(format!(
1832 "invalid IPv6 multicast address {group_address}"
1833 ))
1834 })?;
1835 if !parsed.is_multicast() {
1836 return Err(SocketTableError::invalid_argument(format!(
1837 "address {group_address} is not an IPv6 multicast group"
1838 )));
1839 }
1840 }
1841 SocketDomain::Unix => {
1842 return Err(SocketTableError::invalid_argument(
1843 "unix sockets do not support multicast membership",
1844 ));
1845 }
1846 }
1847
1848 Ok(SocketMulticastMembership::new(
1849 group_address,
1850 interface_address,
1851 ))
1852}
1853
1854fn has_incompatible_inet_bind_conflict(
1855 table: &SocketTableState,
1856 record: &SocketRecord,
1857 conflicting_ids: &[SocketId],
1858) -> bool {
1859 conflicting_ids.iter().any(|conflicting_id| {
1860 if *conflicting_id == record.id {
1861 return false;
1862 }
1863
1864 let Some(existing) = table.sockets.get(conflicting_id) else {
1865 return false;
1866 };
1867
1868 if supports_inet_datagram_lifecycle(record.spec) {
1869 !inet_datagram_bind_shares_port(record, existing)
1870 } else {
1871 true
1872 }
1873 })
1874}
1875
1876fn inet_datagram_bind_shares_port(requested: &SocketRecord, existing: &SocketRecord) -> bool {
1877 (requested.reuse_port() && existing.reuse_port())
1878 || (requested.reuse_address() && existing.reuse_address())
1879}
1880
1881fn remove_socket(table: &mut SocketTableState, socket_id: SocketId) -> Option<SocketRecord> {
1882 let record = table.sockets.remove(&socket_id)?;
1883 unregister_bound_socket(table, &record);
1884 unregister_multicast_memberships(table, &record);
1885 if let Some(listener_state) = record.listener_state.as_ref() {
1886 let pending_socket_ids = listener_state
1887 .pending_accepts
1888 .iter()
1889 .filter_map(|pending| pending.accepted_socket_id)
1890 .collect::<Vec<_>>();
1891 for pending_socket_id in pending_socket_ids {
1892 let _ = remove_socket(table, pending_socket_id);
1893 }
1894 }
1895 if let Some(connection) = record.connection_state.as_ref() {
1896 if let Some(peer_socket_id) = connection.peer_socket_id {
1897 if let Some(peer) = table.sockets.get_mut(&peer_socket_id) {
1898 if let Some(peer_connection) = peer.connection_state.as_mut() {
1899 if peer_connection.peer_socket_id == Some(socket_id) {
1900 peer_connection.peer_socket_id = None;
1901 }
1902 peer_connection.peer_write_shutdown = true;
1903 }
1904 }
1905 }
1906 }
1907 if let Some(owner_sockets) = table.by_owner.get_mut(&record.owner_pid) {
1908 owner_sockets.remove(&socket_id);
1909 if owner_sockets.is_empty() {
1910 table.by_owner.remove(&record.owner_pid);
1911 }
1912 }
1913 Some(record)
1914}
1915
1916fn unregister_bound_socket(table: &mut SocketTableState, record: &SocketRecord) {
1917 let Some(address) = record.local_address.as_ref() else {
1918 if supports_unix_stream_lifecycle(record.spec) {
1919 if let Some(path) = record.local_unix_path.as_ref() {
1920 if table.bound_unix_streams.get(path).copied() == Some(record.id) {
1921 table.bound_unix_streams.remove(path);
1922 }
1923 }
1924 }
1925 return;
1926 };
1927 if supports_inet_stream_lifecycle(record.spec)
1928 && table.bound_inet_streams.get(address).copied() == Some(record.id)
1929 {
1930 table.bound_inet_streams.remove(address);
1931 }
1932 if supports_inet_datagram_lifecycle(record.spec) {
1933 if let Some(socket_ids) = table.bound_inet_datagrams.get_mut(address) {
1934 socket_ids.remove(&record.id);
1935 if socket_ids.is_empty() {
1936 table.bound_inet_datagrams.remove(address);
1937 }
1938 }
1939 }
1940}
1941
1942fn unregister_multicast_memberships(table: &mut SocketTableState, record: &SocketRecord) {
1943 let Some(datagram_state) = record.datagram_state.as_ref() else {
1944 return;
1945 };
1946
1947 for membership in &datagram_state.multicast_memberships {
1948 if let Some(socket_ids) = table.multicast_groups.get_mut(membership) {
1949 socket_ids.remove(&record.id);
1950 if socket_ids.is_empty() {
1951 table.multicast_groups.remove(membership);
1952 }
1953 }
1954 }
1955}
1956
1957fn normalize_inet_address(address: InetSocketAddress) -> InetSocketAddress {
1958 match address.host().to_ascii_lowercase().as_str() {
1959 "localhost" => InetSocketAddress::new("127.0.0.1", address.port()),
1960 _ => address,
1961 }
1962}
1963
1964fn wildcard_inet_address(address: &InetSocketAddress) -> Option<InetSocketAddress> {
1965 match address.host() {
1966 "127.0.0.1" => Some(InetSocketAddress::new("0.0.0.0", address.port())),
1967 "::1" => Some(InetSocketAddress::new("::", address.port())),
1968 _ => None,
1969 }
1970}
1971
1972fn normalize_unix_socket_path(path: impl AsRef<str>) -> SocketResult<String> {
1973 let normalized = normalize_path(path.as_ref());
1974 if normalized == "/" {
1975 return Err(SocketTableError::invalid_argument(
1976 "unix socket path must not be empty or root",
1977 ));
1978 }
1979 Ok(normalized)
1980}
1981
1982fn lock_or_recover<'a, T>(mutex: &'a Mutex<T>) -> MutexGuard<'a, T> {
1983 match mutex.lock() {
1984 Ok(guard) => guard,
1985 Err(poisoned) => poisoned.into_inner(),
1986 }
1987}