use std::io::{BufWriter, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread::{spawn, JoinHandle};
use mproxy_client::target_socket_interface;
use mproxy_server::upstream_socket_interface;
const BUFSIZE: usize = 8096;
fn handle_client_tcp(downstream: TcpStream, multicast_addr: String) {
#[cfg(debug_assertions)]
println!(
"handling downstream client: {} UDP -> {:?} TCP",
multicast_addr, downstream
);
let (_multicast_addr, multicast_socket) =
if let Ok((addr, socket)) = upstream_socket_interface(multicast_addr) {
if !addr.ip().is_multicast() {
panic!("not a multicast address {}", addr);
}
(addr, socket)
} else {
panic!()
};
let mut buf = [0u8; BUFSIZE];
let mut tcp_writer = BufWriter::new(downstream);
loop {
match multicast_socket.recv_from(&mut buf[0..]) {
Ok((count_input, _remote_addr)) => {
let _count_output = tcp_writer.write(&buf[0..count_input]);
}
Err(err) => {
eprintln!("reverse_proxy: got an error: {}", err);
break;
}
}
if let Err(_e) = tcp_writer.flush() {
#[cfg(debug_assertions)]
eprintln!("reverse_proxy: closing {:?} {}", multicast_socket, _e);
break;
}
}
}
pub fn reverse_proxy_udp_tcp(multicast_addr: String, tcp_listen_addr: String) -> JoinHandle<()> {
#[cfg(debug_assertions)]
println!(
"forwarding: {} UDP -> {} TCP",
multicast_addr, tcp_listen_addr
);
spawn(move || {
let listener = TcpListener::bind(tcp_listen_addr).expect("binding downstream TCP Listener");
for stream in listener.incoming() {
#[cfg(debug_assertions)]
println!("new client {:?}", stream);
let multicast_addr = multicast_addr.clone();
let _tcp_client = spawn(move || {
handle_client_tcp(stream.unwrap(), multicast_addr);
});
}
})
}
pub fn reverse_proxy_udp(udp_input_addr: String, udp_output_addr: String) -> JoinHandle<()> {
#[cfg(debug_assertions)]
println!(
"forwarding: {} UDP -> {} UDP",
udp_input_addr, udp_output_addr
);
spawn(move || {
let (addr, listen_socket) = upstream_socket_interface(udp_input_addr).unwrap();
let (outaddr, output_socket) = target_socket_interface(&udp_output_addr).unwrap();
let mut buf = [0u8; BUFSIZE];
loop {
match listen_socket.recv_from(&mut buf[0..]) {
Ok((c, remote_addr)) => {
if c == 0 {
eprintln!("got message with size 0 from upstream: {}", remote_addr);
} else {
let c_out = output_socket
.send_to(&buf[0..c], outaddr)
.expect("forwarding UDP downstream");
assert!(c == c_out);
}
}
Err(err) => {
eprintln!("{}:reverse_proxy: error {}", addr, err);
break;
}
}
}
})
}
pub fn reverse_proxy_tcp_udp(upstream_tcp: String, downstream_udp: String) -> JoinHandle<()> {
spawn(move || {
let listener = TcpListener::bind(upstream_tcp).expect("binding TCP socket");
for upstream in listener.incoming() {
let (target_addr, target_socket) = target_socket_interface(&downstream_udp).unwrap();
let mut buf = [0u8; BUFSIZE];
match upstream {
Ok(mut input) => {
spawn(move || loop {
match input.read(&mut buf[0..]) {
Ok(c) => {
target_socket
.send_to(&buf[0..c], target_addr)
.expect("sending to UDP socket");
}
Err(e) => {
eprintln!("err: {}", e);
break;
}
}
});
}
Err(e) => {
eprintln!("dropping client: {}", e);
}
}
}
})
}