1use crate::common::interface;
7use crate::common::interface::IP;
8use crate::error::{invalid_socket_value, Error};
9use std::convert::TryFrom;
10use std::io::ErrorKind as IOErrorKind;
11use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket};
12use std::time::Duration;
13use tracing::{debug, error, trace};
14
15#[derive(Clone, Debug)]
22pub struct Options {
23 pub(crate) network_interface: Option<String>,
24 pub(crate) network_version: Option<IP>,
25 pub(crate) local_port: u16,
26 pub(crate) recv_timeout: u64,
27 pub(crate) packet_ttl: u32,
28 pub(crate) local_network_only: bool,
29 pub(crate) loop_back_also: bool,
30 }
32
33pub const DEFAULT_BUFFER_SIZE: usize = 1500;
34
35pub const DEFAULT_RECV_TIMEOUT: u64 = 2;
36
37pub fn create_multicast_socket(
42 to_address: &SocketAddr,
43 options: &Options,
44) -> Result<UdpSocket, Error> {
45 debug!("create_multicast_socket - options: {:?}", options);
46 let local_address = match interface::ip_address_for_interface(
47 &options.network_interface,
48 &options.network_version,
49 ) {
50 None => match &options.network_version {
51 Some(IP::V6) => SocketAddr::V6(SocketAddrV6::new(
52 Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0),
53 options.local_port,
54 0,
55 0,
56 )),
57 _ => SocketAddr::V4(SocketAddrV4::new(
58 Ipv4Addr::new(0, 0, 0, 0),
59 options.local_port,
60 )),
61 },
62 Some(address) => SocketAddr::new(address, options.local_port),
63 };
64 trace!(
65 "create_multicast_socket - binding to local_address: {:?}",
66 local_address
67 );
68 let socket = UdpSocket::bind(local_address)?;
69
70 trace!("create_multicast_socket - setting socket options");
71 socket.set_nonblocking(false)?;
72 socket.set_ttl(options.packet_ttl)?;
73 socket.set_read_timeout(Some(Duration::from_secs(options.recv_timeout)))?;
74 match (to_address, local_address) {
75 (SocketAddr::V4(to_address), SocketAddr::V4(local_address)) => {
76 socket.join_multicast_v4(to_address.ip(), local_address.ip())?;
77 socket.set_multicast_ttl_v4(if options.local_network_only { 1 } else { 10 })?;
78 socket.set_multicast_loop_v4(options.loop_back_also)?;
79 }
80 (SocketAddr::V6(_), SocketAddr::V6(_)) => {
81 socket.set_multicast_loop_v6(options.loop_back_also)?;
82 }
83 _ => {
84 return invalid_socket_value(
85 "to, local",
86 &format!("{}, {}", to_address, local_address),
87 )
88 .into();
89 }
90 }
91
92 trace!(
93 "create_multicast_socket - socket: {:?}, read_timeout: {:?}, ttl: {:?}, multicast_ttl: {}",
94 socket,
95 socket.read_timeout()?,
96 socket.ttl()?,
97 socket.multicast_ttl_v4()?
98 );
99
100 Ok(socket)
101}
102
103pub fn multicast(
104 message: &Request,
105 to_address: &SocketAddr,
106 options: &Options,
107) -> Result<Vec<Response>, Error> {
108 let socket = create_multicast_socket(to_address, options)?;
109
110 multicast_using(message, to_address, &socket)
111}
112
113pub fn multicast_once(
114 message: &Request,
115 to_address: &SocketAddr,
116 options: &Options,
117) -> Result<(), Error> {
118 let socket = create_multicast_socket(to_address, options)?;
119
120 multicast_once_using(message, to_address, &socket)
121}
122
123pub fn multicast_using(
124 message: &Request,
125 to_address: &SocketAddr,
126 socket: &UdpSocket,
127) -> Result<Vec<Response>, Error> {
128 multicast_send_using(message, to_address, socket)?;
129
130 let mut responses: Vec<Response> = Default::default();
131
132 loop {
133 let mut buf = [0u8; DEFAULT_BUFFER_SIZE];
134 trace!(
135 "multicast_using - blocking on recv_from, buffer size {}",
136 DEFAULT_BUFFER_SIZE
137 );
138 match socket.recv_from(&mut buf) {
139 Ok((received, from)) => {
140 trace!(
141 "multicast_using - received {} bytes from {:?}",
142 received,
143 from,
144 );
145 responses.push(Response::try_from(&buf[..received])?);
146 }
147 Err(e) => {
148 if e.kind() == IOErrorKind::WouldBlock {
149 trace!("multicast_using - socket timed out, no data");
150 break;
151 } else {
152 error!("multicast_using - socket read returned error: {:?}", e);
153 return Err(Error::NetworkTransport(e));
154 }
155 }
156 }
157 }
158 Ok(responses)
159}
160
161pub fn multicast_once_using(
162 message: &Request,
163 to_address: &SocketAddr,
164 socket: &UdpSocket,
165) -> Result<(), Error> {
166 multicast_send_using(message, to_address, socket)
167}
168
169impl Default for Options {
174 fn default() -> Self {
175 Options {
176 network_interface: None,
177 network_version: None,
178 local_port: 0,
179 recv_timeout: DEFAULT_RECV_TIMEOUT,
180 packet_ttl: 2,
181 local_network_only: false,
182 loop_back_also: false,
183 }
185 }
186}
187
188#[inline]
193fn multicast_send_using(
194 message: &Request,
195 to_address: &SocketAddr,
196 socket: &UdpSocket,
197) -> Result<(), Error> {
198 let message: String = message.into();
199 socket.send_to(message.as_bytes(), to_address)?;
200 Ok(())
201}
202
203#[doc(hidden)]
208mod builder;
209pub use builder::RequestBuilder;
210
211#[doc(hidden)]
212mod request;
213pub use request::Request;
214
215#[doc(hidden)]
216mod response;
217pub use response::Response;