1use crate::{
10 bbframe::{BBFrameDefrag, BBFrameReceiver, BBFrameRecv, BBFrameStream},
11 gsepacket::{GSEPacketDefrag, PDU},
12};
13use anyhow::{Context, Result};
14use clap::Parser;
15use std::{
16 net::{SocketAddr, TcpListener, UdpSocket},
17 os::unix::io::AsRawFd,
18 sync::{mpsc, Arc, Mutex},
19 thread,
20 time::Duration,
21};
22
23#[derive(Parser, Debug)]
25#[command(author, version, about, long_about = None)]
26struct Args {
27 #[arg(long)]
29 listen: SocketAddr,
30 #[arg(long)]
32 tun: String,
33 #[arg(long, default_value_t)]
35 input: InputFormat,
36 #[arg(long, default_value_t = 0)]
38 header_length: usize,
39 #[arg(long)]
41 isi: Option<u8>,
42 #[arg(long, default_value_t = 100.0)]
44 stats_interval: f64,
45 #[arg(long)]
47 skip_total_length: bool,
48}
49
50#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Default)]
51enum InputFormat {
52 #[default]
54 UdpFragments,
55 UdpComplete,
57 Tcp,
59}
60
61impl std::str::FromStr for InputFormat {
62 type Err = String;
63
64 fn from_str(s: &str) -> Result<Self, Self::Err> {
65 Ok(match s {
66 "UDP" | "UDP fragments" => InputFormat::UdpFragments,
67 "UDP complete" => InputFormat::UdpComplete,
68 "TCP" => InputFormat::Tcp,
69 _ => return Err(format!("invalid input format {s}")),
70 })
71 }
72}
73
74impl std::fmt::Display for InputFormat {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
76 write!(
77 f,
78 "{}",
79 match self {
80 InputFormat::UdpFragments => "UDP fragments",
81 InputFormat::UdpComplete => "UDP complete",
82 InputFormat::Tcp => "TCP",
83 }
84 )
85 }
86}
87
88fn setup_multicast(socket: &UdpSocket, addr: &SocketAddr) -> Result<()> {
89 match addr.ip() {
90 std::net::IpAddr::V4(addr) if addr.is_multicast() => {
91 set_reuseaddr(socket)?;
92 log::info!("joining multicast address {}", addr);
93 socket.join_multicast_v4(&addr, &std::net::Ipv4Addr::UNSPECIFIED)?;
94 }
95 std::net::IpAddr::V6(addr) if addr.is_multicast() => {
96 set_reuseaddr(socket)?;
97 log::info!("joining multicast address {}", addr);
98 socket.join_multicast_v6(&addr, 0)?;
99 }
100 _ => (),
101 }
102 Ok(())
103}
104
105fn set_reuseaddr(socket: &UdpSocket) -> Result<()> {
106 let optval: libc::c_int = 1;
107 if unsafe {
108 libc::setsockopt(
109 socket.as_raw_fd(),
110 libc::SOL_SOCKET,
111 libc::SO_REUSEADDR,
112 &optval as *const _ as *const libc::c_void,
113 libc::socklen_t::try_from(std::mem::size_of::<libc::c_int>()).unwrap(),
114 )
115 } != 0
116 {
117 let err = std::io::Error::last_os_error();
118 anyhow::bail!("could not set SO_REUSEADDR: {err}")
119 }
120 Ok(())
121}
122
123#[derive(Debug)]
124struct AppLoop<D> {
125 bbframe_recv: D,
126 gsepacket_defrag: GSEPacketDefrag,
127 tun: tun_tap::Iface,
128 bbframe_recv_errors_fatal: bool,
129 stats: Arc<Mutex<Stats>>,
130}
131
132fn write_pdu_tun(pdu: &PDU, tun: &mut tun_tap::Iface, stats: &mut Stats) {
133 if let Err(err) = tun.send(pdu.data()) {
134 log::error!("could not write packet to TUN device: {err}");
135 stats.tun_errors += 1;
136 }
137}
138
139impl<D: BBFrameReceiver> AppLoop<D> {
140 fn app_loop(&mut self) -> Result<()> {
141 loop {
142 let bbframe = self.bbframe_recv.get_bbframe();
143 let mut stats = self.stats.lock().unwrap();
144 let bbframe = match bbframe {
145 Ok(b) => {
146 stats.bbframes += 1;
147 b
148 }
149 Err(err) => {
150 stats.bbframe_errors += 1;
151 if self.bbframe_recv_errors_fatal {
152 return Err(err).context("failed to receive BBFRAME");
153 } else {
154 continue;
155 }
156 }
157 };
158 for pdu in self.gsepacket_defrag.defragment(&bbframe).unwrap() {
160 stats.gse_packets += 1;
161 write_pdu_tun(&pdu, &mut self.tun, &mut stats);
162 }
163 drop(stats);
166 }
167 }
168}
169
170fn gsepacket_defragmenter(args: &Args) -> GSEPacketDefrag {
171 let mut defrag = GSEPacketDefrag::new();
172 defrag.set_skip_total_length_check(args.skip_total_length);
173 defrag
174}
175
176#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
177struct Stats {
178 bbframes: u64,
179 bbframe_errors: u64,
180 gse_packets: u64,
181 tun_errors: u64,
182}
183
184fn report_stats(stats: &Mutex<Stats>, interval: Duration) {
185 loop {
186 {
187 let stats = stats.lock().unwrap();
188 log::info!(
189 "BBFRAMES: {}, BBFRAME errors: {}, GSE packets: {}, TUN errors: {}",
190 stats.bbframes,
191 stats.bbframe_errors,
192 stats.gse_packets,
193 stats.tun_errors
194 );
195 }
196 std::thread::sleep(interval);
197 }
198}
199
200pub fn main() -> Result<()> {
202 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
203 let args = Args::parse();
204 let mut tun = tun_tap::Iface::without_packet_info(&args.tun, tun_tap::Mode::Tun)
205 .context("failed to open TUN device")?;
206 log::info!("dvb-gse v{} started", env!("CARGO_PKG_VERSION"));
207 let stats = Arc::new(Mutex::new(Stats::default()));
208 if args.stats_interval != 0.0 {
209 std::thread::spawn({
210 let stats = Arc::clone(&stats);
211 move || {
212 report_stats(&stats, Duration::from_secs_f64(args.stats_interval));
213 }
214 });
215 }
216 match args.input {
217 InputFormat::UdpFragments | InputFormat::UdpComplete => {
218 let gsepacket_defrag = gsepacket_defragmenter(&args);
219 let socket = UdpSocket::bind(args.listen).context("failed to bind to UDP socket")?;
220 setup_multicast(&socket, &args.listen)?;
221 match args.input {
222 InputFormat::UdpFragments => {
223 let mut bbframe_recv = BBFrameDefrag::new(socket);
224 bbframe_recv.set_isi(args.isi);
225 bbframe_recv.set_header_bytes(args.header_length)?;
226 let mut app = AppLoop {
227 bbframe_recv,
228 gsepacket_defrag,
229 tun,
230 bbframe_recv_errors_fatal: true,
231 stats,
232 };
233 app.app_loop()?;
234 }
235 InputFormat::UdpComplete => {
236 let mut bbframe_recv = BBFrameRecv::new(socket);
237 bbframe_recv.set_isi(args.isi);
238 bbframe_recv.set_header_bytes(args.header_length)?;
239 let mut app = AppLoop {
240 bbframe_recv,
241 gsepacket_defrag,
242 tun,
243 bbframe_recv_errors_fatal: false,
244 stats,
245 };
246 app.app_loop()?;
247 }
248 _ => unreachable!(),
249 }
250 }
251 InputFormat::Tcp => {
252 let listener =
253 TcpListener::bind(args.listen).context("failed to bind to TCP socket")?;
254 let channel_capacity = 64;
259 let (tun_tx, tun_rx) = mpsc::sync_channel(channel_capacity);
260 thread::spawn({
261 let stats = Arc::clone(&stats);
262 move || {
263 for pdu in tun_rx.iter() {
264 write_pdu_tun(&pdu, &mut tun, &mut stats.lock().unwrap());
265 }
266 }
267 });
268 thread::scope(|s| {
270 for stream in listener.incoming() {
271 let stream = match stream {
272 Ok(s) => s,
273 Err(e) => {
274 log::error!("connection error {e}");
275 continue;
276 }
277 };
278 match stream.peer_addr() {
279 Ok(addr) => log::info!("TCP client connected from {addr}"),
280 Err(err) => log::error!(
281 "TCP client connected (but could not retrieve peer address): {err}"
282 ),
283 }
284 s.spawn({
285 let args = &args;
286 let tun_tx = tun_tx.clone();
287 let stats = &stats;
288 move || {
289 let mut gsepacket_defrag = gsepacket_defragmenter(args);
290 let mut bbframe_recv = BBFrameStream::new(stream);
291 bbframe_recv.set_isi(args.isi);
292 if let Err(err) = bbframe_recv.set_header_bytes(args.header_length) {
293 eprintln!("could not set header length: {err}");
294 std::process::exit(1);
295 }
296 loop {
297 let bbframe = bbframe_recv.get_bbframe();
298 let bbframe = {
299 let mut stats = stats.lock().unwrap();
300 match bbframe {
301 Ok(b) => {
302 stats.bbframes += 1;
303 b
304 }
305 Err(err) => {
306 log::error!("failed to receive BBFRAME; terminating connection: {err}");
307 stats.bbframe_errors += 1;
308 return;
309 }
310 }
311 };
312 for pdu in gsepacket_defrag.defragment(&bbframe).unwrap() {
314 tun_tx.send(pdu).unwrap();
315 }
316 }
317 }
318 });
319 }
320 });
321 }
322 }
323 Ok(())
324}