netgauze_udp_notif_service/
lib.rs

1// Copyright (C) 2023-present The NetGauze Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12// implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use netgauze_udp_notif_pkt::UdpNotifPacket;
17use std::{fmt::Display, io, net::SocketAddr, sync::Arc};
18
19pub mod actor;
20pub mod supervisor;
21
22pub type ActorId = u32;
23pub type SubscriberId = u32;
24pub type UdpNotifRequest = (SocketAddr, UdpNotifPacket);
25
26pub type UdpNotifSender = async_channel::Sender<Arc<UdpNotifRequest>>;
27pub type UdpNotifReceiver = async_channel::Receiver<Arc<UdpNotifRequest>>;
28
29pub fn create_udp_notif_channel(buffer_size: usize) -> (UdpNotifSender, UdpNotifReceiver) {
30    async_channel::bounded(buffer_size)
31}
32
33#[derive(Debug, Clone)]
34pub struct Subscription {
35    actor_id: ActorId,
36    id: SubscriberId,
37}
38
39impl Display for Subscription {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        write!(
42            f,
43            "subscription {{ actor_id: {}, id: {} }}",
44            self.actor_id, self.id
45        )
46    }
47}
48
49/// Enable socket reuse and bind to a device or a VRF on selected platforms.
50/// Binding to a device or VRF is supported on: MacOS and Linux.
51/// Unused variables is enabled to silence the Clippy error for platforms
52/// that doesn't support binding to an interface.
53#[allow(unused_variables)]
54pub fn new_udp_reuse_port(
55    local_addr: SocketAddr,
56    device: Option<String>,
57) -> io::Result<tokio::net::UdpSocket> {
58    let udp_sock = socket2::Socket::new(
59        if local_addr.is_ipv4() {
60            socket2::Domain::IPV4
61        } else {
62            socket2::Domain::IPV6
63        },
64        socket2::Type::DGRAM,
65        None,
66    )?;
67    #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
68    udp_sock.set_reuse_port(true)?;
69    udp_sock.set_recv_buffer_size(1024 * 1024 * 20)?; // 20 MB
70    #[cfg(unix)]
71    // from tokio-rs/mio/blob/master/src/sys/unix/net.rs
72    udp_sock.set_cloexec(true)?;
73    udp_sock.set_nonblocking(true)?;
74    // Binding a socket to a device or VRF is a platform specific operation,
75    // hence we guard it for only a selected subset of target platforms.
76    // The first cfg block filters for the supported platforms to avoid Clippy
77    // errors about unused `name` for the unsupported platforms.
78    #[cfg(any(
79        target_os = "ios",
80        target_os = "macos",
81        target_os = "tvos",
82        target_os = "watchos",
83        target_os = "android",
84        target_os = "fuchsia",
85        target_os = "linux"
86    ))]
87    if let Some(name) = device {
88        #[cfg(any(
89            target_os = "ios",
90            target_os = "macos",
91            target_os = "tvos",
92            target_os = "watchos",
93        ))]
94        {
95            let c_str = std::ffi::CString::new(name)?;
96            let c_index = unsafe { libc::if_nametoindex(c_str.as_ptr() as *const libc::c_char) };
97            let index = std::num::NonZeroU32::new(c_index as u32);
98            if local_addr.is_ipv4() {
99                udp_sock.bind_device_by_index_v4(index)?;
100            } else {
101                udp_sock.bind_device_by_index_v6(index)?;
102            }
103        }
104        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
105        udp_sock.bind_device(Some(name.as_bytes()))?
106    }
107    udp_sock.bind(&socket2::SockAddr::from(local_addr))?;
108    let udp_sock: std::net::UdpSocket = udp_sock.into();
109    udp_sock.try_into()
110}