mproxy_client/
lib.rs

1//! Multicast Network Dispatcher and Proxy
2//!
3//! # MPROXY: Client
4//! Stream file or socket data via UDP. Supports multicast routing
5//!
6//!
7//! ## Quick Start
8//! In `Cargo.toml`
9//! ```toml
10//! [dependencies]
11//! mproxy-client = "0.1"
12//! ```
13//!
14//! Example `src/main.rs`
15//! ```rust,no_run
16//! use std::path::PathBuf;
17//! use std::thread::spawn;
18//!
19//! use mproxy_client::client_socket_stream;
20//!
21//! // read input from stdin
22//! let path = PathBuf::from("-");
23//!
24//! // downstream UDP socket addresses
25//! let server_addrs =  vec!["127.0.0.1:9919".into(), "localhost:9921".into(), "[ff02::1]:9920".into()];
26//!
27//! // copy input to stdout
28//! let tee = true;
29//!
30//! let client_thread = spawn(move || {
31//!     client_socket_stream(&path, server_addrs, tee).unwrap();
32//! });
33//!
34//! // run client until EOF
35//! client_thread.join().unwrap();
36//! ```
37//!
38//! ## Command Line Interface
39//! Install with cargo
40//! ```bash
41//! cargo install mproxy-client
42//! ```
43//!
44//! ```text
45//! MPROXY: UDP Client
46//!
47//! Stream local data to logging servers via UDP
48//!
49//! USAGE:
50//!   mproxy-client [FLAGS] [OPTIONS] ...
51//!
52//! OPTIONS:
53//!   --path        [FILE_DESCRIPTOR]   Filepath, descriptor, or handle. Use "-" for stdin
54//!   --server-addr [HOSTNAME:PORT]     Downstream UDP server address. May be repeated
55//!
56//! FLAGS:
57//!   -h, --help    Prints help information
58//!   -t, --tee     Copy input to stdout
59//!
60//! EXAMPLE:
61//!   mproxy-client --path /dev/random --server-addr '127.0.0.1:9920' --server-addr '[::1]:9921'
62//!   mproxy-client --path - --server-addr '224.0.0.1:9922' --server-addr '[ff02::1]:9923' --tee >> logfile.log
63//! ```
64//!
65//! ### See Also
66//! - [mproxy-client](https://docs.rs/mproxy-client/)
67//! - [mproxy-server](https://docs.rs/mproxy-server/)
68//! - [mproxy-forward](https://docs.rs/mproxy-forward/)
69//! - [mproxy-reverse](https://docs.rs/mproxy-reverse/)
70//!
71
72use std::fs::OpenOptions;
73use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Read, Result as ioResult, Write};
74use std::net::{IpAddr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket};
75use std::path::PathBuf;
76use std::str::FromStr;
77
78const BUFSIZE: usize = 8096;
79
80pub fn target_socket_interface(server_addr: &String) -> ioResult<(SocketAddr, UdpSocket)> {
81    let target_addr = server_addr
82        .to_socket_addrs()
83        .unwrap()
84        .next()
85        .expect("parsing socket address");
86
87    // Binds to a random UDP port for sending to downstream.
88    let unspec: SocketAddr = if target_addr.is_ipv4() {
89        SocketAddr::new(std::net::Ipv4Addr::UNSPECIFIED.into(), 0)
90    } else {
91        SocketAddr::new(std::net::Ipv6Addr::UNSPECIFIED.into(), 0)
92    };
93
94    let target_socket = UdpSocket::bind(unspec).expect("binding client socket");
95    //target_socket.connect(target_addr).unwrap_or_else(|e| panic!("{}", e));
96
97    if target_addr.ip().is_multicast() {
98        match target_addr.ip() {
99            // join the ipv4 multicast group
100            IpAddr::V4(ip) => {
101                target_socket
102                    .join_multicast_v4(&ip, &std::net::Ipv4Addr::UNSPECIFIED)
103                    .unwrap_or_else(|e| panic!("{}", e));
104            }
105
106            // for multicast ipv6, join the multicast group on an unspecified
107            // interface, then connect to an unspecified remote socket address
108            // with the target port
109            IpAddr::V6(ip) => {
110                #[cfg(target_os = "linux")]
111                let itf = 0;
112
113                #[cfg(target_os = "windows")]
114                let itf = 0;
115
116                #[cfg(target_os = "macos")]
117                let itf = default_net::get_default_interface()
118                    .expect("Getting default network interface")
119                    .index;
120
121                #[cfg(not(target_os = "windows"))]
122                target_socket
123                    .connect(SocketAddr::new(
124                        IpAddr::V6(Ipv6Addr::UNSPECIFIED),
125                        target_addr.port(),
126                    ))
127                    .unwrap_or_else(|e| panic!("{}", e));
128
129                #[cfg(target_os = "windows")]
130                target_socket
131                    .connect(target_addr)
132                    .unwrap_or_else(|e| panic!("{}", e));
133
134                target_socket
135                    .join_multicast_v6(&ip, itf) // index 0 for unspecified interface
136                    .unwrap_or_else(|e| panic!("{}", e));
137            }
138        };
139    }
140
141    Ok((target_addr, target_socket))
142}
143
144/// Read bytes from `path` info a buffer, and forward to downstream UDP server addresses.
145/// Optionally copy output to stdout
146pub fn client_socket_stream(path: &PathBuf, server_addrs: Vec<String>, tee: bool) -> ioResult<()> {
147    let mut targets = vec![];
148
149    for server_addr in server_addrs {
150        let (target_addr, target_socket) = target_socket_interface(&server_addr)?;
151
152        targets.push((target_addr, target_socket));
153        println!(
154            "logging from {}: sending to {}",
155            &path.as_os_str().to_str().unwrap(),
156            server_addr,
157        );
158    }
159
160    // if path is "-" set read buffer to stdin
161    // otherwise, create buffered reader from given file descriptor
162    let mut reader: Box<dyn BufRead> = if path == &PathBuf::from_str("-").unwrap() {
163        Box::new(BufReader::new(stdin()))
164    } else {
165        Box::new(BufReader::new(
166            OpenOptions::new()
167                .create(false)
168                .write(false)
169                .read(true)
170                .open(path)
171                .unwrap_or_else(|e| {
172                    panic!("opening {}, {}", path.as_os_str().to_str().unwrap(), e)
173                }),
174        ))
175    };
176
177    let mut buf = vec![0u8; BUFSIZE];
178    let mut output_buffer = BufWriter::new(stdout());
179
180    while let Ok(c) = reader.read(&mut buf) {
181        if c == 0 {
182            #[cfg(debug_assertions)]
183            println!(
184                "\nclient: encountered EOF in {}, exiting...",
185                &path.display(),
186            );
187            break;
188        } else if c == 1 && String::from_utf8(buf[0..c].to_vec()).unwrap() == *"\n" {
189            // skip empty lines
190            continue;
191        }
192
193        for (target_addr, target_socket) in &targets {
194            if !(target_addr.is_ipv6() && target_addr.ip().is_multicast()) {
195                target_socket
196                    .send_to(&buf[0..c], target_addr)
197                    .unwrap_or_else(|e| panic!("sending to server socket: {}", e));
198            } else {
199                target_socket
200                    .send(&buf[0..c])
201                    .unwrap_or_else(|e| panic!("sending to server socket: {}", e));
202            }
203        }
204        if tee {
205            let _o = output_buffer
206                .write(&buf[0..c])
207                .expect("writing to output buffer");
208            output_buffer.flush().unwrap();
209            #[cfg(debug_assertions)]
210            assert!(c == _o);
211        }
212    }
213    Ok(())
214}