mproxy_reverse/lib.rs
1//! Multicast Network Dispatcher and Proxy
2//!
3//! # MPROXY: Reverse Proxy
4//! Forward upstream TCP and UDP upstream to downstream listeners.
5//! Messages are routed via UDP multicast to downstream sender threads.
6//! Spawns one thread per listener.
7//!
8//!
9//! ## Quick Start
10//! In `Cargo.toml`
11//! ```toml
12//! [dependencies]
13//! mproxy-reverse = "0.1"
14//! ```
15//!
16//! Example `src/main.rs`
17//! ```rust,no_run
18//! use mproxy_reverse::{reverse_proxy_tcp_udp, reverse_proxy_udp, reverse_proxy_udp_tcp};
19//!
20//! let udp_listen_addr: Option<String> = Some("0.0.0.0:9920".into());
21//! let tcp_listen_addr: Option<String> = None;
22//! let multicast_addr: String = "[ff02::1]:9918".into();
23//! let tcp_output_addr: Option<String> = Some("[::1]:9921".into());
24//! let udp_output_addr: Option<String> = None;
25//!
26//! let mut threads = vec![];
27//!
28//! // TCP connection listener -> UDP multicast channel
29//! if let Some(tcpin) = tcp_listen_addr {
30//! let tcp_rproxy = reverse_proxy_tcp_udp(tcpin, multicast_addr.clone());
31//! threads.push(tcp_rproxy);
32//! }
33//!
34//! // UDP multicast listener -> TCP sender
35//! if let Some(tcpout) = &tcp_output_addr {
36//! let tcp_proxy = reverse_proxy_udp_tcp(multicast_addr.clone(), tcpout.to_string());
37//! threads.push(tcp_proxy);
38//! }
39//!
40//! // UDP multicast listener -> UDP sender
41//! if let Some(udpout) = udp_output_addr {
42//! let udp_proxy = reverse_proxy_udp(multicast_addr, udpout);
43//! threads.push(udp_proxy);
44//! }
45//!
46//! for thread in threads {
47//! thread.join().unwrap();
48//! }
49//! ```
50//!
51//! ## Command Line Interface
52//! Install with Cargo
53//! ```bash
54//! cargo install mproxy-reverse
55//! ```
56//!
57//! ```text
58//! MPROXY: Reverse Proxy
59//!
60//! Forward upstream TCP and UDP upstream to downstream listeners.
61//! Messages are routed via UDP multicast to downstream sender threads.
62//! Spawns one thread per listener.
63//!
64//! USAGE:
65//! mproxy-reverse [FLAGS] [OPTIONS]
66//!
67//! OPTIONS:
68//! --udp-listen-addr [HOSTNAME:PORT] Spawn a UDP socket listener, and forward to --multicast-addr
69//! --tcp_listen_addr [HOSTNAME:PORT] Reverse-proxy accepting TCP connections and forwarding to --multicast-addr
70//! --multicast-addr [MULTICAST_IP:PORT] Defaults to '[ff02::1]:9918'
71//! --tcp-output-addr [HOSTNAME:PORT] Forward packets from --multicast-addr to TCP downstream
72//! --udp_output_addr [HOSTNAME:PORT] Forward packets from --multicast-addr to UDP downstream
73//!
74//! FLAGS:
75//! -h, --help Prints help information
76//! -t, --tee Print UDP input to stdout
77//!
78//! EXAMPLE:
79//! mproxy-reverse --udp-listen-addr '0.0.0.0:9920' --tcp-output-addr '[::1]:9921' --multicast-addr '224.0.0.1:9922'
80//! ```
81//!
82//! ### See Also
83//! - [mproxy-client](https://docs.rs/mproxy-client/)
84//! - [mproxy-server](https://docs.rs/mproxy-server/)
85//! - [mproxy-forward](https://docs.rs/mproxy-forward/)
86//! - [mproxy-reverse](https://docs.rs/mproxy-reverse/)
87//!
88
89use std::io::{BufWriter, Read, Write};
90use std::net::{TcpListener, TcpStream};
91use std::thread::{spawn, JoinHandle};
92
93use mproxy_client::target_socket_interface;
94use mproxy_server::upstream_socket_interface;
95
96const BUFSIZE: usize = 8096;
97
98fn handle_client_tcp(downstream: TcpStream, multicast_addr: String) {
99 #[cfg(debug_assertions)]
100 println!(
101 "handling downstream client: {} UDP -> {:?} TCP",
102 multicast_addr, downstream
103 );
104 let (_multicast_addr, multicast_socket) =
105 if let Ok((addr, socket)) = upstream_socket_interface(multicast_addr) {
106 if !addr.ip().is_multicast() {
107 panic!("not a multicast address {}", addr);
108 }
109 (addr, socket)
110 } else {
111 panic!()
112 };
113
114 let mut buf = [0u8; BUFSIZE];
115 let mut tcp_writer = BufWriter::new(downstream);
116
117 loop {
118 match multicast_socket.recv_from(&mut buf[0..]) {
119 Ok((count_input, _remote_addr)) => {
120 //println!("{}", String::from_utf8_lossy(&buf[0..count_input]));
121 let _count_output = tcp_writer.write(&buf[0..count_input]);
122 }
123 Err(err) => {
124 eprintln!("reverse_proxy: got an error: {}", err);
125 break;
126 }
127 }
128 if let Err(_e) = tcp_writer.flush() {
129 #[cfg(debug_assertions)]
130 eprintln!("reverse_proxy: closing {:?} {}", multicast_socket, _e);
131 break;
132 }
133 }
134}
135
136/// Forward a UDP socket stream (e.g. from a multicast channel) to connected TCP clients.
137/// Spawns a listener thread, plus one thread for each incoming TCP connection.
138pub fn reverse_proxy_udp_tcp(multicast_addr: String, tcp_listen_addr: String) -> JoinHandle<()> {
139 #[cfg(debug_assertions)]
140 println!(
141 "forwarding: {} UDP -> {} TCP",
142 multicast_addr, tcp_listen_addr
143 );
144 spawn(move || {
145 let listener = TcpListener::bind(tcp_listen_addr).expect("binding downstream TCP Listener");
146 for stream in listener.incoming() {
147 #[cfg(debug_assertions)]
148 println!("new client {:?}", stream);
149 let multicast_addr = multicast_addr.clone();
150 let _tcp_client = spawn(move || {
151 handle_client_tcp(stream.unwrap(), multicast_addr);
152 });
153 }
154 })
155}
156
157/// Forward bytes from UDP upstream socket address to UDP downstream socket address
158pub fn reverse_proxy_udp(udp_input_addr: String, udp_output_addr: String) -> JoinHandle<()> {
159 #[cfg(debug_assertions)]
160 println!(
161 "forwarding: {} UDP -> {} UDP",
162 udp_input_addr, udp_output_addr
163 );
164 spawn(move || {
165 let (addr, listen_socket) = upstream_socket_interface(udp_input_addr).unwrap();
166 let (outaddr, output_socket) = target_socket_interface(&udp_output_addr).unwrap();
167
168 let mut buf = [0u8; BUFSIZE];
169 loop {
170 match listen_socket.recv_from(&mut buf[0..]) {
171 Ok((c, remote_addr)) => {
172 if c == 0 {
173 eprintln!("got message with size 0 from upstream: {}", remote_addr);
174 } else {
175 let c_out = output_socket
176 .send_to(&buf[0..c], outaddr)
177 .expect("forwarding UDP downstream");
178 assert!(c == c_out);
179 //println!("{}", String::from_utf8_lossy(&buf[0..c]));
180 }
181 }
182 Err(err) => {
183 eprintln!("{}:reverse_proxy: error {}", addr, err);
184 break;
185 }
186 }
187 }
188 })
189}
190
191/// Listen for incoming TCP connections and forward received bytes to a UDP socket address
192pub fn reverse_proxy_tcp_udp(upstream_tcp: String, downstream_udp: String) -> JoinHandle<()> {
193 //pub fn reverse_proxy_tcp_udp(upstream_tcp: String, downstream_udp: String) {
194 spawn(move || {
195 let listener = TcpListener::bind(upstream_tcp).expect("binding TCP socket");
196
197 for upstream in listener.incoming() {
198 let (target_addr, target_socket) = target_socket_interface(&downstream_udp).unwrap();
199 let mut buf = [0u8; BUFSIZE];
200 //let mut stream = stream.as_ref().expect("connecting to stream");
201
202 match upstream {
203 Ok(mut input) => {
204 spawn(move || loop {
205 match input.read(&mut buf[0..]) {
206 Ok(c) => {
207 target_socket
208 .send_to(&buf[0..c], target_addr)
209 .expect("sending to UDP socket");
210 }
211 Err(e) => {
212 eprintln!("err: {}", e);
213 break;
214 }
215 }
216 });
217 }
218 Err(e) => {
219 eprintln!("dropping client: {}", e);
220 }
221 }
222 }
223 })
224}