mproxy_client/lib.rs
1//! Multicast Network Dispatcher and Proxy
2//!
3//! # MPROXY: Client
4//! Stream file or socket data via UDP. Supports multicast routing
5//!
6//!
7//! ## Quick Start
8//! In `Cargo.toml`
9//! ```toml
10//! [dependencies]
11//! mproxy-client = "0.1"
12//! ```
13//!
14//! Example `src/main.rs`
15//! ```rust,no_run
16//! use std::path::PathBuf;
17//! use std::thread::spawn;
18//!
19//! use mproxy_client::client_socket_stream;
20//!
21//! // read input from stdin
22//! let path = PathBuf::from("-");
23//!
24//! // downstream UDP socket addresses
25//! let server_addrs = vec!["127.0.0.1:9919".into(), "localhost:9921".into(), "[ff02::1]:9920".into()];
26//!
27//! // copy input to stdout
28//! let tee = true;
29//!
30//! let client_thread = spawn(move || {
31//! client_socket_stream(&path, server_addrs, tee).unwrap();
32//! });
33//!
34//! // run client until EOF
35//! client_thread.join().unwrap();
36//! ```
37//!
38//! ## Command Line Interface
39//! Install with cargo
40//! ```bash
41//! cargo install mproxy-client
42//! ```
43//!
44//! ```text
45//! MPROXY: UDP Client
46//!
47//! Stream local data to logging servers via UDP
48//!
49//! USAGE:
50//! mproxy-client [FLAGS] [OPTIONS] ...
51//!
52//! OPTIONS:
53//! --path [FILE_DESCRIPTOR] Filepath, descriptor, or handle. Use "-" for stdin
54//! --server-addr [HOSTNAME:PORT] Downstream UDP server address. May be repeated
55//!
56//! FLAGS:
57//! -h, --help Prints help information
58//! -t, --tee Copy input to stdout
59//!
60//! EXAMPLE:
61//! mproxy-client --path /dev/random --server-addr '127.0.0.1:9920' --server-addr '[::1]:9921'
62//! mproxy-client --path - --server-addr '224.0.0.1:9922' --server-addr '[ff02::1]:9923' --tee >> logfile.log
63//! ```
64//!
65//! ### See Also
66//! - [mproxy-client](https://docs.rs/mproxy-client/)
67//! - [mproxy-server](https://docs.rs/mproxy-server/)
68//! - [mproxy-forward](https://docs.rs/mproxy-forward/)
69//! - [mproxy-reverse](https://docs.rs/mproxy-reverse/)
70//!
71
72use std::fs::OpenOptions;
73use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Read, Result as ioResult, Write};
74use std::net::{IpAddr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket};
75use std::path::PathBuf;
76use std::str::FromStr;
77
78const BUFSIZE: usize = 8096;
79
80pub fn target_socket_interface(server_addr: &String) -> ioResult<(SocketAddr, UdpSocket)> {
81 let target_addr = server_addr
82 .to_socket_addrs()
83 .unwrap()
84 .next()
85 .expect("parsing socket address");
86
87 // Binds to a random UDP port for sending to downstream.
88 let unspec: SocketAddr = if target_addr.is_ipv4() {
89 SocketAddr::new(std::net::Ipv4Addr::UNSPECIFIED.into(), 0)
90 } else {
91 SocketAddr::new(std::net::Ipv6Addr::UNSPECIFIED.into(), 0)
92 };
93
94 let target_socket = UdpSocket::bind(unspec).expect("binding client socket");
95 //target_socket.connect(target_addr).unwrap_or_else(|e| panic!("{}", e));
96
97 if target_addr.ip().is_multicast() {
98 match target_addr.ip() {
99 // join the ipv4 multicast group
100 IpAddr::V4(ip) => {
101 target_socket
102 .join_multicast_v4(&ip, &std::net::Ipv4Addr::UNSPECIFIED)
103 .unwrap_or_else(|e| panic!("{}", e));
104 }
105
106 // for multicast ipv6, join the multicast group on an unspecified
107 // interface, then connect to an unspecified remote socket address
108 // with the target port
109 IpAddr::V6(ip) => {
110 #[cfg(target_os = "linux")]
111 let itf = 0;
112
113 #[cfg(target_os = "windows")]
114 let itf = 0;
115
116 #[cfg(target_os = "macos")]
117 let itf = default_net::get_default_interface()
118 .expect("Getting default network interface")
119 .index;
120
121 #[cfg(not(target_os = "windows"))]
122 target_socket
123 .connect(SocketAddr::new(
124 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
125 target_addr.port(),
126 ))
127 .unwrap_or_else(|e| panic!("{}", e));
128
129 #[cfg(target_os = "windows")]
130 target_socket
131 .connect(target_addr)
132 .unwrap_or_else(|e| panic!("{}", e));
133
134 target_socket
135 .join_multicast_v6(&ip, itf) // index 0 for unspecified interface
136 .unwrap_or_else(|e| panic!("{}", e));
137 }
138 };
139 }
140
141 Ok((target_addr, target_socket))
142}
143
144/// Read bytes from `path` info a buffer, and forward to downstream UDP server addresses.
145/// Optionally copy output to stdout
146pub fn client_socket_stream(path: &PathBuf, server_addrs: Vec<String>, tee: bool) -> ioResult<()> {
147 let mut targets = vec![];
148
149 for server_addr in server_addrs {
150 let (target_addr, target_socket) = target_socket_interface(&server_addr)?;
151
152 targets.push((target_addr, target_socket));
153 println!(
154 "logging from {}: sending to {}",
155 &path.as_os_str().to_str().unwrap(),
156 server_addr,
157 );
158 }
159
160 // if path is "-" set read buffer to stdin
161 // otherwise, create buffered reader from given file descriptor
162 let mut reader: Box<dyn BufRead> = if path == &PathBuf::from_str("-").unwrap() {
163 Box::new(BufReader::new(stdin()))
164 } else {
165 Box::new(BufReader::new(
166 OpenOptions::new()
167 .create(false)
168 .write(false)
169 .read(true)
170 .open(path)
171 .unwrap_or_else(|e| {
172 panic!("opening {}, {}", path.as_os_str().to_str().unwrap(), e)
173 }),
174 ))
175 };
176
177 let mut buf = vec![0u8; BUFSIZE];
178 let mut output_buffer = BufWriter::new(stdout());
179
180 while let Ok(c) = reader.read(&mut buf) {
181 if c == 0 {
182 #[cfg(debug_assertions)]
183 println!(
184 "\nclient: encountered EOF in {}, exiting...",
185 &path.display(),
186 );
187 break;
188 } else if c == 1 && String::from_utf8(buf[0..c].to_vec()).unwrap() == *"\n" {
189 // skip empty lines
190 continue;
191 }
192
193 for (target_addr, target_socket) in &targets {
194 if !(target_addr.is_ipv6() && target_addr.ip().is_multicast()) {
195 target_socket
196 .send_to(&buf[0..c], target_addr)
197 .unwrap_or_else(|e| panic!("sending to server socket: {}", e));
198 } else {
199 target_socket
200 .send(&buf[0..c])
201 .unwrap_or_else(|e| panic!("sending to server socket: {}", e));
202 }
203 }
204 if tee {
205 let _o = output_buffer
206 .write(&buf[0..c])
207 .expect("writing to output buffer");
208 output_buffer.flush().unwrap();
209 #[cfg(debug_assertions)]
210 assert!(c == _o);
211 }
212 }
213 Ok(())
214}