dotlib/
multicast.rs

1use std::io;
2use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
3use std::sync::{Arc, Barrier};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::thread::{self, JoinHandle};
6use std::time::Duration;
7
8use socket2::{Domain, Protocol, SockAddr, Socket, Type};
9
10lazy_static! {
11    //pub static ref IPV4: IpAddr = Ipv4Addr::new(224, 0, 0, 123).into();
12    //pub static ref IPV6: IpAddr = Ipv6Addr::new(0xFF03, 0, 0, 0, 0, 0, 0, 0x0123).into();
13    pub static ref STOP: &'static str = "stop";
14}
15
16// this will be common for all our sockets
17fn new_socket(addr: &SocketAddr) -> io::Result<Socket> {
18    let domain = if addr.is_ipv4() {
19        Domain::ipv4()
20    } else {
21        Domain::ipv6()
22    };
23
24    let socket = Socket::new(domain, Type::dgram(), Some(Protocol::udp()))?;
25
26    // we're going to use read timeouts so that we don't hang waiting for packets
27    socket.set_read_timeout(Some(Duration::from_millis(100)))?;
28
29    Ok(socket)
30}
31
32/// On Windows, unlike all Unix variants, it is improper to bind to the multicast address
33///
34/// see https://msdn.microsoft.com/en-us/library/windows/desktop/ms737550(v=vs.85).aspx
35#[cfg(windows)]
36fn bind_multicast(socket: &Socket, addr: &SocketAddr) -> io::Result<()> {
37    let addr = match *addr {
38        SocketAddr::V4(addr) => SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), addr.port()),
39        SocketAddr::V6(addr) => {
40            SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), addr.port())
41        }
42    };
43    socket.bind(&socket2::SockAddr::from(addr))
44}
45
46/// On unixes we bind to the multicast address, which causes multicast packets to be filtered
47#[cfg(unix)]
48fn bind_multicast(socket: &Socket, addr: &SocketAddr) -> io::Result<()> {
49    socket.bind(&socket2::SockAddr::from(*addr))
50}
51
52fn join_multicast(addr: SocketAddr) -> io::Result<UdpSocket> {
53    let ip_addr = addr.ip();
54
55    let socket = new_socket(&addr)?;
56
57    // depending on the IP protocol we have slightly different work
58    match ip_addr {
59        IpAddr::V4(ref mdns_v4) => {
60            // join to the multicast address, with all interfaces
61            socket.join_multicast_v4(mdns_v4, &Ipv4Addr::new(0, 0, 0, 0))?;
62        }
63        IpAddr::V6(ref mdns_v6) => {
64            // join to the multicast address, with all interfaces (ipv6 uses indexes not addresses)
65            socket.join_multicast_v6(mdns_v6, 0)?;
66            socket.set_only_v6(true)?;
67        }
68    };
69
70    socket.set_reuse_address(true);
71    
72    #[cfg(unix)]
73    socket.set_reuse_port(true);
74
75    #[cfg(unix)]
76    info!("Warning: Cannot set reuse of ports on Windows.");
77
78    // bind us to the socket address.
79    bind_multicast(&socket, &addr)?;
80
81    // convert to standard sockets
82    Ok(socket.into_udp_socket())
83}
84
85pub fn multicast_listener(
86    addr: SocketAddr,
87) -> JoinHandle<()> {
88    // A barrier to not start the client test code until after the server is running
89    let server_barrier = Arc::new(Barrier::new(2));
90    let client_barrier = Arc::clone(&server_barrier);
91
92    let join_handle = std::thread::Builder::new()
93        .name(format!("dot:listener"))
94        .spawn(move || {
95            // socket creation will go here...
96            let listener = join_multicast(addr).expect("failed to create listener");
97            println!("dot:listener: joined: {}",  addr);
98
99            server_barrier.wait();
100            println!("dot:listener: is ready");
101
102            let mut stop = false;
103            //listener.set_nonblocking(true).unwrap();
104            while !stop {
105                // test receive and response code will go here...
106                let mut buf = [0u8; 1024]; // receive buffer
107                
108                match listener.recv_from(&mut buf) {
109                    Ok((len, remote_addr)) => {
110                        let data = &buf[..len];
111                        
112                        // break when stop message recevied
113                        if *STOP.as_bytes() == *data.clone() {
114                            stop = true;
115                        }
116
117                        println!(
118                            "dot:listener: received request: {} from: {}",
119                            String::from_utf8_lossy(data),
120                            remote_addr
121                        );
122                    }
123                    Err(err) => {
124                        //println!("dot:listener: got an error: {}", err);
125                    }
126                }    
127            }
128
129            println!("dot:listener: stopped!");
130        })
131        .unwrap();
132
133    client_barrier.wait();
134    join_handle
135}
136
137pub fn new_sender(addr: &SocketAddr) -> io::Result<UdpSocket> {
138    let socket = new_socket(addr)?;
139
140    if addr.is_ipv4() {
141        socket.set_multicast_if_v4(&Ipv4Addr::new(0, 0, 0, 0))?;
142
143        socket.bind(&SockAddr::from(SocketAddr::new(
144            Ipv4Addr::new(0, 0, 0, 0).into(),
145            0,
146        )))?;
147    } else {
148        // *WARNING* THIS IS SPECIFIC TO THE AUTHORS COMPUTER
149        //   find the index of your IPv6 interface you'd like to test with.
150        socket.set_multicast_if_v6(5)?;
151
152        socket.bind(&SockAddr::from(SocketAddr::new(
153            Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(),
154            0,
155        )))?;
156    }
157
158    // convert to standard sockets...
159    Ok(socket.into_udp_socket())
160}