mproxy_reverse/
lib.rs

1//! Multicast Network Dispatcher and Proxy
2//!
3//! # MPROXY: Reverse Proxy
4//! Forward upstream TCP and UDP upstream to downstream listeners.
5//! Messages are routed via UDP multicast to downstream sender threads.
6//! Spawns one thread per listener.
7//!
8//!
9//! ## Quick Start
10//! In `Cargo.toml`
11//! ```toml
12//! [dependencies]
13//! mproxy-reverse = "0.1"
14//! ```
15//!
16//! Example `src/main.rs`
17//! ```rust,no_run
18//! use mproxy_reverse::{reverse_proxy_tcp_udp, reverse_proxy_udp, reverse_proxy_udp_tcp};
19//!
20//! let udp_listen_addr: Option<String> = Some("0.0.0.0:9920".into());
21//! let tcp_listen_addr: Option<String> = None;
22//! let multicast_addr: String = "[ff02::1]:9918".into();
23//! let tcp_output_addr: Option<String> = Some("[::1]:9921".into());
24//! let udp_output_addr: Option<String> = None;
25//!
26//! let mut threads = vec![];
27//!
28//! // TCP connection listener -> UDP multicast channel
29//! if let Some(tcpin) = tcp_listen_addr {
30//!     let tcp_rproxy = reverse_proxy_tcp_udp(tcpin, multicast_addr.clone());
31//!     threads.push(tcp_rproxy);
32//! }
33//!
34//! // UDP multicast listener -> TCP sender
35//! if let Some(tcpout) = &tcp_output_addr {
36//!     let tcp_proxy = reverse_proxy_udp_tcp(multicast_addr.clone(), tcpout.to_string());
37//!     threads.push(tcp_proxy);
38//! }
39//!
40//! // UDP multicast listener -> UDP sender
41//! if let Some(udpout) = udp_output_addr {
42//!     let udp_proxy = reverse_proxy_udp(multicast_addr, udpout);
43//!     threads.push(udp_proxy);
44//! }
45//!
46//! for thread in threads {
47//!     thread.join().unwrap();
48//! }
49//! ```
50//!
51//! ## Command Line Interface
52//! Install with Cargo
53//! ```bash
54//! cargo install mproxy-reverse
55//! ```
56//!
57//! ```text
58//! MPROXY: Reverse Proxy
59//!
60//! Forward upstream TCP and UDP upstream to downstream listeners.
61//! Messages are routed via UDP multicast to downstream sender threads.
62//! Spawns one thread per listener.
63//!
64//! USAGE:
65//!   mproxy-reverse  [FLAGS] [OPTIONS]
66//!
67//! OPTIONS:
68//!   --udp-listen-addr [HOSTNAME:PORT]     Spawn a UDP socket listener, and forward to --multicast-addr
69//!   --tcp_listen_addr [HOSTNAME:PORT]     Reverse-proxy accepting TCP connections and forwarding to --multicast-addr
70//!   --multicast-addr  [MULTICAST_IP:PORT] Defaults to '[ff02::1]:9918'
71//!   --tcp-output-addr [HOSTNAME:PORT]     Forward packets from --multicast-addr to TCP downstream
72//!   --udp_output_addr [HOSTNAME:PORT]     Forward packets from --multicast-addr to UDP downstream
73//!
74//! FLAGS:
75//!   -h, --help    Prints help information
76//!   -t, --tee     Print UDP input to stdout
77//!
78//! EXAMPLE:
79//!   mproxy-reverse --udp-listen-addr '0.0.0.0:9920' --tcp-output-addr '[::1]:9921' --multicast-addr '224.0.0.1:9922'
80//! ```
81//!
82//! ### See Also
83//! - [mproxy-client](https://docs.rs/mproxy-client/)
84//! - [mproxy-server](https://docs.rs/mproxy-server/)
85//! - [mproxy-forward](https://docs.rs/mproxy-forward/)
86//! - [mproxy-reverse](https://docs.rs/mproxy-reverse/)
87//!
88
89use std::io::{BufWriter, Read, Write};
90use std::net::{TcpListener, TcpStream};
91use std::thread::{spawn, JoinHandle};
92
93use mproxy_client::target_socket_interface;
94use mproxy_server::upstream_socket_interface;
95
96const BUFSIZE: usize = 8096;
97
98fn handle_client_tcp(downstream: TcpStream, multicast_addr: String) {
99    #[cfg(debug_assertions)]
100    println!(
101        "handling downstream client: {} UDP -> {:?} TCP",
102        multicast_addr, downstream
103    );
104    let (_multicast_addr, multicast_socket) =
105        if let Ok((addr, socket)) = upstream_socket_interface(multicast_addr) {
106            if !addr.ip().is_multicast() {
107                panic!("not a multicast address {}", addr);
108            }
109            (addr, socket)
110        } else {
111            panic!()
112        };
113
114    let mut buf = [0u8; BUFSIZE];
115    let mut tcp_writer = BufWriter::new(downstream);
116
117    loop {
118        match multicast_socket.recv_from(&mut buf[0..]) {
119            Ok((count_input, _remote_addr)) => {
120                //println!("{}", String::from_utf8_lossy(&buf[0..count_input]));
121                let _count_output = tcp_writer.write(&buf[0..count_input]);
122            }
123            Err(err) => {
124                eprintln!("reverse_proxy: got an error: {}", err);
125                break;
126            }
127        }
128        if let Err(_e) = tcp_writer.flush() {
129            #[cfg(debug_assertions)]
130            eprintln!("reverse_proxy: closing {:?} {}", multicast_socket, _e);
131            break;
132        }
133    }
134}
135
136/// Forward a UDP socket stream (e.g. from a multicast channel) to connected TCP clients.
137/// Spawns a listener thread, plus one thread for each incoming TCP connection.
138pub fn reverse_proxy_udp_tcp(multicast_addr: String, tcp_listen_addr: String) -> JoinHandle<()> {
139    #[cfg(debug_assertions)]
140    println!(
141        "forwarding: {} UDP -> {} TCP",
142        multicast_addr, tcp_listen_addr
143    );
144    spawn(move || {
145        let listener = TcpListener::bind(tcp_listen_addr).expect("binding downstream TCP Listener");
146        for stream in listener.incoming() {
147            #[cfg(debug_assertions)]
148            println!("new client {:?}", stream);
149            let multicast_addr = multicast_addr.clone();
150            let _tcp_client = spawn(move || {
151                handle_client_tcp(stream.unwrap(), multicast_addr);
152            });
153        }
154    })
155}
156
157/// Forward bytes from UDP upstream socket address to UDP downstream socket address
158pub fn reverse_proxy_udp(udp_input_addr: String, udp_output_addr: String) -> JoinHandle<()> {
159    #[cfg(debug_assertions)]
160    println!(
161        "forwarding: {} UDP -> {} UDP",
162        udp_input_addr, udp_output_addr
163    );
164    spawn(move || {
165        let (addr, listen_socket) = upstream_socket_interface(udp_input_addr).unwrap();
166        let (outaddr, output_socket) = target_socket_interface(&udp_output_addr).unwrap();
167
168        let mut buf = [0u8; BUFSIZE];
169        loop {
170            match listen_socket.recv_from(&mut buf[0..]) {
171                Ok((c, remote_addr)) => {
172                    if c == 0 {
173                        eprintln!("got message with size 0 from upstream: {}", remote_addr);
174                    } else {
175                        let c_out = output_socket
176                            .send_to(&buf[0..c], outaddr)
177                            .expect("forwarding UDP downstream");
178                        assert!(c == c_out);
179                        //println!("{}", String::from_utf8_lossy(&buf[0..c]));
180                    }
181                }
182                Err(err) => {
183                    eprintln!("{}:reverse_proxy: error {}", addr, err);
184                    break;
185                }
186            }
187        }
188    })
189}
190
191/// Listen for incoming TCP connections and forward received bytes to a UDP socket address
192pub fn reverse_proxy_tcp_udp(upstream_tcp: String, downstream_udp: String) -> JoinHandle<()> {
193    //pub fn reverse_proxy_tcp_udp(upstream_tcp: String, downstream_udp: String) {
194    spawn(move || {
195        let listener = TcpListener::bind(upstream_tcp).expect("binding TCP socket");
196
197        for upstream in listener.incoming() {
198            let (target_addr, target_socket) = target_socket_interface(&downstream_udp).unwrap();
199            let mut buf = [0u8; BUFSIZE];
200            //let mut stream = stream.as_ref().expect("connecting to stream");
201
202            match upstream {
203                Ok(mut input) => {
204                    spawn(move || loop {
205                        match input.read(&mut buf[0..]) {
206                            Ok(c) => {
207                                target_socket
208                                    .send_to(&buf[0..c], target_addr)
209                                    .expect("sending to UDP socket");
210                            }
211                            Err(e) => {
212                                eprintln!("err: {}", e);
213                                break;
214                            }
215                        }
216                    });
217                }
218                Err(e) => {
219                    eprintln!("dropping client: {}", e);
220                }
221            }
222        }
223    })
224}