netgauze_flow_service/
lib.rs1pub mod flow_actor;
17pub mod flow_supervisor;
18
19use netgauze_flow_pkt::FlowInfo;
20use std::fmt::Display;
21use std::io;
22use std::net::SocketAddr;
23use std::sync::Arc;
24
25pub type ActorId = u32;
26pub type SubscriberId = u32;
27pub type FlowRequest = (SocketAddr, FlowInfo);
28
29pub type FlowSender = async_channel::Sender<Arc<FlowRequest>>;
30pub type FlowReceiver = async_channel::Receiver<Arc<FlowRequest>>;
31
32pub fn create_flow_channel(buffer_size: usize) -> (FlowSender, FlowReceiver) {
33 async_channel::bounded(buffer_size)
34}
35
36#[derive(Debug, Clone)]
37pub struct Subscription {
38 actor_id: ActorId,
39 id: SubscriberId,
40}
41
42impl Display for Subscription {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(
45 f,
46 "Subscription {{ actor_id: {}, id: {} }}",
47 self.actor_id, self.id
48 )
49 }
50}
51
52#[allow(unused_variables)]
58pub fn new_udp_reuse_port(
59 local_addr: SocketAddr,
60 device: Option<String>,
61) -> io::Result<tokio::net::UdpSocket> {
62 let udp_sock = socket2::Socket::new(
63 if local_addr.is_ipv4() {
64 socket2::Domain::IPV4
65 } else {
66 socket2::Domain::IPV6
67 },
68 socket2::Type::DGRAM,
69 None,
70 )?;
71 #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
72 udp_sock.set_reuse_port(true)?;
73 udp_sock.set_recv_buffer_size(1024 * 1024 * 20)?; #[cfg(unix)]
75 udp_sock.set_cloexec(true)?;
77 udp_sock.set_nonblocking(true)?;
78 #[cfg(any(
83 target_os = "ios",
84 target_os = "macos",
85 target_os = "tvos",
86 target_os = "watchos",
87 target_os = "android",
88 target_os = "fuchsia",
89 target_os = "linux"
90 ))]
91 if let Some(name) = device {
92 #[cfg(any(
93 target_os = "ios",
94 target_os = "macos",
95 target_os = "tvos",
96 target_os = "watchos",
97 ))]
98 {
99 let c_str = std::ffi::CString::new(name)?;
100 let c_index = unsafe { libc::if_nametoindex(c_str.as_ptr() as *const libc::c_char) };
101 let index = std::num::NonZeroU32::new(c_index as u32);
102 if local_addr.is_ipv4() {
103 udp_sock.bind_device_by_index_v4(index)?;
104 } else {
105 udp_sock.bind_device_by_index_v6(index)?;
106 }
107 }
108 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
109 udp_sock.bind_device(Some(name.as_bytes()))?
110 }
111 udp_sock.bind(&socket2::SockAddr::from(local_addr))?;
112 let udp_sock: std::net::UdpSocket = udp_sock.into();
113 udp_sock.try_into()
114}