1use crate::{
10 bbframe::{BBFrameDefrag, BBFrameReceiver, BBFrameRecv, BBFrameStream},
11 gseheader::Label,
12 gsepacket::{GSEPacketDefrag, PDU},
13};
14use anyhow::{Context, Result};
15use clap::Parser;
16use std::{
17 net::{SocketAddr, TcpListener, UdpSocket},
18 os::unix::io::AsRawFd,
19 sync::{Arc, Mutex, mpsc},
20 thread,
21 time::Duration,
22};
23
24#[derive(Parser, Debug)]
26#[command(author, version, about, long_about = None)]
27struct Args {
28 #[arg(long)]
30 listen: SocketAddr,
31 #[arg(long)]
33 tun: String,
34 #[arg(long, default_value_t)]
36 input: InputFormat,
37 #[arg(long, default_value_t = 0)]
39 header_length: usize,
40 #[arg(long)]
42 isi: Option<u8>,
43 #[arg(long, default_value_t = 100.0)]
45 stats_interval: f64,
46 #[arg(long)]
48 skip_total_length: bool,
49 #[arg(long)]
54 allow_broadcast: bool,
55 #[arg(long, value_parser = Label::from_hex)]
61 allow_label: Vec<Label>,
62}
63
64#[derive(Debug, Clone)]
65struct AllowSettings {
66 allow_broadcast: bool,
67 allow_label: Vec<Label>,
68}
69
70impl AllowSettings {
71 fn is_label_allowed(&self, label: &Label) -> bool {
72 if !self.allow_broadcast && self.allow_label.is_empty() {
73 return true;
75 }
76 if label.is_broadcast() {
77 return self.allow_broadcast;
78 }
79 self.allow_label.iter().any(|allowed| label == allowed)
80 }
81}
82
83impl From<&Args> for AllowSettings {
84 fn from(args: &Args) -> AllowSettings {
85 AllowSettings {
86 allow_broadcast: args.allow_broadcast,
87 allow_label: args.allow_label.clone(),
88 }
89 }
90}
91
92#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Default)]
93enum InputFormat {
94 #[default]
96 UdpFragments,
97 UdpComplete,
99 Tcp,
101}
102
103impl std::str::FromStr for InputFormat {
104 type Err = String;
105
106 fn from_str(s: &str) -> Result<Self, Self::Err> {
107 Ok(match s {
108 "UDP" | "UDP fragments" => InputFormat::UdpFragments,
109 "UDP complete" => InputFormat::UdpComplete,
110 "TCP" => InputFormat::Tcp,
111 _ => return Err(format!("invalid input format {s}")),
112 })
113 }
114}
115
116impl std::fmt::Display for InputFormat {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
118 write!(
119 f,
120 "{}",
121 match self {
122 InputFormat::UdpFragments => "UDP fragments",
123 InputFormat::UdpComplete => "UDP complete",
124 InputFormat::Tcp => "TCP",
125 }
126 )
127 }
128}
129
130fn setup_multicast(socket: &UdpSocket, addr: &SocketAddr) -> Result<()> {
131 match addr.ip() {
132 std::net::IpAddr::V4(addr) if addr.is_multicast() => {
133 set_reuseaddr(socket)?;
134 log::info!("joining multicast address {}", addr);
135 socket.join_multicast_v4(&addr, &std::net::Ipv4Addr::UNSPECIFIED)?;
136 }
137 std::net::IpAddr::V6(addr) if addr.is_multicast() => {
138 set_reuseaddr(socket)?;
139 log::info!("joining multicast address {}", addr);
140 socket.join_multicast_v6(&addr, 0)?;
141 }
142 _ => (),
143 }
144 Ok(())
145}
146
147fn set_reuseaddr(socket: &UdpSocket) -> Result<()> {
148 let optval: libc::c_int = 1;
149 if unsafe {
150 libc::setsockopt(
151 socket.as_raw_fd(),
152 libc::SOL_SOCKET,
153 libc::SO_REUSEADDR,
154 &optval as *const _ as *const libc::c_void,
155 libc::socklen_t::try_from(std::mem::size_of::<libc::c_int>()).unwrap(),
156 )
157 } != 0
158 {
159 let err = std::io::Error::last_os_error();
160 anyhow::bail!("could not set SO_REUSEADDR: {err}")
161 }
162 Ok(())
163}
164
165#[derive(Debug)]
166struct AppLoop<D> {
167 bbframe_recv: D,
168 gsepacket_defrag: GSEPacketDefrag,
169 tun: tun_tap::Iface,
170 bbframe_recv_errors_fatal: bool,
171 stats: Arc<Mutex<Stats>>,
172 allow_settings: AllowSettings,
173}
174
175fn write_pdu_tun(
176 pdu: &PDU,
177 tun: &mut tun_tap::Iface,
178 stats: &mut Stats,
179 allow_settings: &AllowSettings,
180) {
181 stats.gse_packets += 1;
182 if !allow_settings.is_label_allowed(pdu.label()) {
183 stats.gse_packets_dropped_by_label += 1;
184 return;
185 }
186 if let Err(err) = tun.send(pdu.data()) {
187 log::error!("could not write packet to TUN device: {err}");
188 stats.tun_errors += 1;
189 }
190}
191
192impl<D: BBFrameReceiver> AppLoop<D> {
193 fn app_loop(&mut self) -> Result<()> {
194 loop {
195 let bbframe = self.bbframe_recv.get_bbframe();
196 let mut stats = self.stats.lock().unwrap();
197 let bbframe = match bbframe {
198 Ok(b) => {
199 stats.bbframes += 1;
200 b
201 }
202 Err(err) => {
203 stats.bbframe_errors += 1;
204 if self.bbframe_recv_errors_fatal {
205 return Err(err).context("failed to receive BBFRAME");
206 } else {
207 continue;
208 }
209 }
210 };
211 for pdu in self.gsepacket_defrag.defragment(&bbframe).unwrap() {
213 write_pdu_tun(&pdu, &mut self.tun, &mut stats, &self.allow_settings);
214 }
215 drop(stats);
218 }
219 }
220}
221
222fn gsepacket_defragmenter(args: &Args) -> GSEPacketDefrag {
223 let mut defrag = GSEPacketDefrag::new();
224 defrag.set_skip_total_length_check(args.skip_total_length);
225 defrag
226}
227
228#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
229struct Stats {
230 bbframes: u64,
231 bbframe_errors: u64,
232 gse_packets: u64,
233 gse_packets_dropped_by_label: u64,
234 tun_errors: u64,
235}
236
237fn report_stats(stats: &Mutex<Stats>, interval: Duration) {
238 loop {
239 {
240 let stats = stats.lock().unwrap();
241 log::info!(
242 "BBFRAMES: {}, BBFRAME errors: {}, GSE packets: {}, GSE packets dropped by label: {}, TUN errors: {}",
243 stats.bbframes,
244 stats.bbframe_errors,
245 stats.gse_packets,
246 stats.gse_packets_dropped_by_label,
247 stats.tun_errors
248 );
249 }
250 std::thread::sleep(interval);
251 }
252}
253
254pub fn main() -> Result<()> {
256 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
257 let args = Args::parse();
258 let mut tun = tun_tap::Iface::without_packet_info(&args.tun, tun_tap::Mode::Tun)
259 .context("failed to open TUN device")?;
260 log::info!("dvb-gse v{} started", env!("CARGO_PKG_VERSION"));
261 let stats = Arc::new(Mutex::new(Stats::default()));
262 if args.stats_interval != 0.0 {
263 std::thread::spawn({
264 let stats = Arc::clone(&stats);
265 move || {
266 report_stats(&stats, Duration::from_secs_f64(args.stats_interval));
267 }
268 });
269 }
270 match args.input {
271 InputFormat::UdpFragments | InputFormat::UdpComplete => {
272 let gsepacket_defrag = gsepacket_defragmenter(&args);
273 let socket = UdpSocket::bind(args.listen).context("failed to bind to UDP socket")?;
274 setup_multicast(&socket, &args.listen)?;
275 let allow_settings = AllowSettings::from(&args);
276 match args.input {
277 InputFormat::UdpFragments => {
278 let mut bbframe_recv = BBFrameDefrag::new(socket);
279 bbframe_recv.set_isi(args.isi);
280 bbframe_recv.set_header_bytes(args.header_length)?;
281 let mut app = AppLoop {
282 bbframe_recv,
283 gsepacket_defrag,
284 tun,
285 bbframe_recv_errors_fatal: true,
286 stats,
287 allow_settings,
288 };
289 app.app_loop()?;
290 }
291 InputFormat::UdpComplete => {
292 let mut bbframe_recv = BBFrameRecv::new(socket);
293 bbframe_recv.set_isi(args.isi);
294 bbframe_recv.set_header_bytes(args.header_length)?;
295 let mut app = AppLoop {
296 bbframe_recv,
297 gsepacket_defrag,
298 tun,
299 bbframe_recv_errors_fatal: false,
300 stats,
301 allow_settings,
302 };
303 app.app_loop()?;
304 }
305 _ => unreachable!(),
306 }
307 }
308 InputFormat::Tcp => {
309 let listener =
310 TcpListener::bind(args.listen).context("failed to bind to TCP socket")?;
311 let channel_capacity = 64;
316 let (tun_tx, tun_rx) = mpsc::sync_channel(channel_capacity);
317 let allow_settings = AllowSettings::from(&args);
318 thread::spawn({
319 let stats = Arc::clone(&stats);
320 move || {
321 for pdu in tun_rx.iter() {
322 write_pdu_tun(&pdu, &mut tun, &mut stats.lock().unwrap(), &allow_settings);
323 }
324 }
325 });
326 thread::scope(|s| {
328 for stream in listener.incoming() {
329 let stream = match stream {
330 Ok(s) => s,
331 Err(e) => {
332 log::error!("connection error {e}");
333 continue;
334 }
335 };
336 match stream.peer_addr() {
337 Ok(addr) => log::info!("TCP client connected from {addr}"),
338 Err(err) => log::error!(
339 "TCP client connected (but could not retrieve peer address): {err}"
340 ),
341 }
342 s.spawn({
343 let args = &args;
344 let tun_tx = tun_tx.clone();
345 let stats = &stats;
346 move || {
347 let mut gsepacket_defrag = gsepacket_defragmenter(args);
348 let mut bbframe_recv = BBFrameStream::new(stream);
349 bbframe_recv.set_isi(args.isi);
350 if let Err(err) = bbframe_recv.set_header_bytes(args.header_length) {
351 eprintln!("could not set header length: {err}");
352 std::process::exit(1);
353 }
354 loop {
355 let bbframe = bbframe_recv.get_bbframe();
356 let bbframe = {
357 let mut stats = stats.lock().unwrap();
358 match bbframe {
359 Ok(b) => {
360 stats.bbframes += 1;
361 b
362 }
363 Err(err) => {
364 log::error!("failed to receive BBFRAME; terminating connection: {err}");
365 stats.bbframe_errors += 1;
366 return;
367 }
368 }
369 };
370 for pdu in gsepacket_defrag.defragment(&bbframe).unwrap() {
372 tun_tx.send(pdu).unwrap();
373 }
374 }
375 }
376 });
377 }
378 });
379 }
380 }
381 Ok(())
382}