Skip to main content

netgauze_flow_service/
lib.rs

1// Copyright (C) 2023-present The NetGauze Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12// implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16pub mod flow_actor;
17pub mod flow_supervisor;
18
19use netgauze_flow_pkt::FlowInfo;
20use std::fmt::Display;
21use std::io;
22use std::net::SocketAddr;
23use std::sync::Arc;
24
25pub type ActorId = u32;
26pub type SubscriberId = u32;
27pub type FlowRequest = (SocketAddr, FlowInfo);
28
29pub type FlowSender = async_channel::Sender<Arc<FlowRequest>>;
30pub type FlowReceiver = async_channel::Receiver<Arc<FlowRequest>>;
31
32pub fn create_flow_channel(buffer_size: usize) -> (FlowSender, FlowReceiver) {
33    async_channel::bounded(buffer_size)
34}
35
36#[derive(Debug, Clone)]
37pub struct Subscription {
38    actor_id: ActorId,
39    id: SubscriberId,
40}
41
42impl Display for Subscription {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(
45            f,
46            "Subscription {{ actor_id: {}, id: {} }}",
47            self.actor_id, self.id
48        )
49    }
50}
51
52/// Enable socket reuse and bind to a device or a VRF on selected platforms.
53/// Binding to device or VRF is supported on: MacOS and Linux.
54///
55/// Unused variables is enabled to silence the Clippy error for platforms
56/// that doesn't support binding to an interface.
57#[allow(unused_variables)]
58pub fn new_udp_reuse_port(
59    local_addr: SocketAddr,
60    device: Option<String>,
61) -> io::Result<tokio::net::UdpSocket> {
62    let udp_sock = socket2::Socket::new(
63        if local_addr.is_ipv4() {
64            socket2::Domain::IPV4
65        } else {
66            socket2::Domain::IPV6
67        },
68        socket2::Type::DGRAM,
69        None,
70    )?;
71    #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
72    udp_sock.set_reuse_port(true)?;
73    udp_sock.set_recv_buffer_size(1024 * 1024 * 20)?; // 20 MB
74    #[cfg(unix)]
75    // from tokio-rs/mio/blob/master/src/sys/unix/net.rs
76    udp_sock.set_cloexec(true)?;
77    udp_sock.set_nonblocking(true)?;
78    // Binding a socket to a device or VRF is platform specific operation,
79    // hence we guard it for only selected subset of target platforms.
80    // The first cfg block filter for all supported platforms to avoid Clippy errors
81    // on unused `name` for the unsupported platforms.
82    #[cfg(any(
83        target_os = "ios",
84        target_os = "macos",
85        target_os = "tvos",
86        target_os = "watchos",
87        target_os = "android",
88        target_os = "fuchsia",
89        target_os = "linux"
90    ))]
91    if let Some(name) = device {
92        #[cfg(any(
93            target_os = "ios",
94            target_os = "macos",
95            target_os = "tvos",
96            target_os = "watchos",
97        ))]
98        {
99            let c_str = std::ffi::CString::new(name)?;
100            let c_index = unsafe { libc::if_nametoindex(c_str.as_ptr() as *const libc::c_char) };
101            let index = std::num::NonZeroU32::new(c_index as u32);
102            if local_addr.is_ipv4() {
103                udp_sock.bind_device_by_index_v4(index)?;
104            } else {
105                udp_sock.bind_device_by_index_v6(index)?;
106            }
107        }
108        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
109        udp_sock.bind_device(Some(name.as_bytes()))?
110    }
111    udp_sock.bind(&socket2::SockAddr::from(local_addr))?;
112    let udp_sock: std::net::UdpSocket = udp_sock.into();
113    udp_sock.try_into()
114}