Skip to main content

dvb_gse/
cli.rs

1//! CLI application.
2//!
3//! This module implements a CLI application that receives UDP or TCP packets
4//! containing BBFRAMEs from an external DVB-S2 receiver such as
5//! [`Longmynd`](https://github.com/BritishAmateurTelevisionClub/longmynd). It
6//! obtains IP packets from a continous-mode GSE stream, and sends the IP
7//! packets to a TUN device.
8
9use crate::{
10    bbframe::{BBFrameDefrag, BBFrameReceiver, BBFrameRecv, BBFrameStream},
11    gseheader::Label,
12    gsepacket::{GSEPacketDefrag, PDU},
13};
14use anyhow::{Context, Result};
15use clap::Parser;
16use std::{
17    net::{SocketAddr, TcpListener, UdpSocket},
18    os::unix::io::AsRawFd,
19    sync::{Arc, Mutex, mpsc},
20    thread,
21    time::Duration,
22};
23
24/// Receive DVB-GSE and send PDUs into a TUN device.
25#[derive(Parser, Debug)]
26#[command(author, version, about, long_about = None)]
27struct Args {
28    /// IP address and port to listen on to receive DVB-S2 BBFRAMEs.
29    #[arg(long)]
30    listen: SocketAddr,
31    /// TUN interface name.
32    #[arg(long)]
33    tun: String,
34    /// Input format: "UDP fragments", "UDP complete", or "TCP".
35    #[arg(long, default_value_t)]
36    input: InputFormat,
37    /// Input header length (the header is discarded).
38    #[arg(long, default_value_t = 0)]
39    header_length: usize,
40    /// ISI to process in MIS mode (if this option is not specified, run in SIS mode).
41    #[arg(long)]
42    isi: Option<u8>,
43    /// Time interval used to log statistics (in seconds).
44    #[arg(long, default_value_t = 100.0)]
45    stats_interval: f64,
46    /// Skip checking the GSE total length field.
47    #[arg(long)]
48    skip_total_length: bool,
49    /// Allow GSE packets addressed to the broadcast label (no label).
50    ///
51    /// If this argument is used, all the GSE packets not explicitly allowed via
52    /// CLI arguments are dropped.
53    #[arg(long)]
54    allow_broadcast: bool,
55    /// Allow GSE packets addressed to this 3-byte or 6-byte label.
56    ///
57    /// If this argument is used, all the GSE packets not explicitly allowed via
58    /// CLI arguments are dropped. This argument can be used multiple times to
59    /// allow multiple labels.
60    #[arg(long, value_parser = Label::from_hex)]
61    allow_label: Vec<Label>,
62}
63
64#[derive(Debug, Clone)]
65struct AllowSettings {
66    allow_broadcast: bool,
67    allow_label: Vec<Label>,
68}
69
70impl AllowSettings {
71    fn is_label_allowed(&self, label: &Label) -> bool {
72        if !self.allow_broadcast && self.allow_label.is_empty() {
73            // no 'allow' arguments used; allow everything
74            return true;
75        }
76        if label.is_broadcast() {
77            return self.allow_broadcast;
78        }
79        self.allow_label.iter().any(|allowed| label == allowed)
80    }
81}
82
83impl From<&Args> for AllowSettings {
84    fn from(args: &Args) -> AllowSettings {
85        AllowSettings {
86            allow_broadcast: args.allow_broadcast,
87            allow_label: args.allow_label.clone(),
88        }
89    }
90}
91
92#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Default)]
93enum InputFormat {
94    /// BBFRAME fragments in UDP datagrams.
95    #[default]
96    UdpFragments,
97    /// Complete BBFRAMEs in UDP datagrams.
98    UdpComplete,
99    /// BBFRAMEs in a TCP stream.
100    Tcp,
101}
102
103impl std::str::FromStr for InputFormat {
104    type Err = String;
105
106    fn from_str(s: &str) -> Result<Self, Self::Err> {
107        Ok(match s {
108            "UDP" | "UDP fragments" => InputFormat::UdpFragments,
109            "UDP complete" => InputFormat::UdpComplete,
110            "TCP" => InputFormat::Tcp,
111            _ => return Err(format!("invalid input format {s}")),
112        })
113    }
114}
115
116impl std::fmt::Display for InputFormat {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
118        write!(
119            f,
120            "{}",
121            match self {
122                InputFormat::UdpFragments => "UDP fragments",
123                InputFormat::UdpComplete => "UDP complete",
124                InputFormat::Tcp => "TCP",
125            }
126        )
127    }
128}
129
130fn setup_multicast(socket: &UdpSocket, addr: &SocketAddr) -> Result<()> {
131    match addr.ip() {
132        std::net::IpAddr::V4(addr) if addr.is_multicast() => {
133            set_reuseaddr(socket)?;
134            log::info!("joining multicast address {}", addr);
135            socket.join_multicast_v4(&addr, &std::net::Ipv4Addr::UNSPECIFIED)?;
136        }
137        std::net::IpAddr::V6(addr) if addr.is_multicast() => {
138            set_reuseaddr(socket)?;
139            log::info!("joining multicast address {}", addr);
140            socket.join_multicast_v6(&addr, 0)?;
141        }
142        _ => (),
143    }
144    Ok(())
145}
146
147fn set_reuseaddr(socket: &UdpSocket) -> Result<()> {
148    let optval: libc::c_int = 1;
149    if unsafe {
150        libc::setsockopt(
151            socket.as_raw_fd(),
152            libc::SOL_SOCKET,
153            libc::SO_REUSEADDR,
154            &optval as *const _ as *const libc::c_void,
155            libc::socklen_t::try_from(std::mem::size_of::<libc::c_int>()).unwrap(),
156        )
157    } != 0
158    {
159        let err = std::io::Error::last_os_error();
160        anyhow::bail!("could not set SO_REUSEADDR: {err}")
161    }
162    Ok(())
163}
164
165#[derive(Debug)]
166struct AppLoop<D> {
167    bbframe_recv: D,
168    gsepacket_defrag: GSEPacketDefrag,
169    tun: tun_tap::Iface,
170    bbframe_recv_errors_fatal: bool,
171    stats: Arc<Mutex<Stats>>,
172    allow_settings: AllowSettings,
173}
174
175fn write_pdu_tun(
176    pdu: &PDU,
177    tun: &mut tun_tap::Iface,
178    stats: &mut Stats,
179    allow_settings: &AllowSettings,
180) {
181    stats.gse_pdus += 1;
182    if !allow_settings.is_label_allowed(pdu.label()) {
183        stats.gse_pdus_dropped_by_label += 1;
184        return;
185    }
186    if let Err(err) = tun.send(pdu.data()) {
187        log::error!("could not write packet to TUN device: {err}");
188        stats.tun_errors += 1;
189    }
190}
191
192impl<D: BBFrameReceiver> AppLoop<D> {
193    fn app_loop(&mut self) -> Result<()> {
194        loop {
195            let bbframe = self.bbframe_recv.get_bbframe();
196            let mut stats = self.stats.lock().unwrap();
197            let bbframe = match bbframe {
198                Ok(b) => {
199                    stats.bbframes += 1;
200                    b
201                }
202                Err(err) => {
203                    stats.bbframe_errors += 1;
204                    if self.bbframe_recv_errors_fatal {
205                        return Err(err).context("failed to receive BBFRAME");
206                    } else {
207                        continue;
208                    }
209                }
210            };
211            // the BBFRAME was validated by bbframe_recv, so we can unwrap here
212            for pdu in self.gsepacket_defrag.defragment(&bbframe).unwrap() {
213                write_pdu_tun(&pdu, &mut self.tun, &mut stats, &self.allow_settings);
214            }
215            // drop stats mutex lock explicitly, for good measure in case code
216            // is added below
217            drop(stats);
218        }
219    }
220}
221
222fn gsepacket_defragmenter(args: &Args) -> GSEPacketDefrag {
223    let mut defrag = GSEPacketDefrag::new();
224    defrag.set_skip_total_length_check(args.skip_total_length);
225    defrag
226}
227
228#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
229struct Stats {
230    bbframes: u64,
231    bbframe_errors: u64,
232    gse_pdus: u64,
233    gse_pdus_dropped_by_label: u64,
234    tun_errors: u64,
235}
236
237fn report_stats(stats: &Mutex<Stats>, interval: Duration) {
238    loop {
239        {
240            let stats = stats.lock().unwrap();
241            log::info!(
242                "BBFRAMES: {}, BBFRAME errors: {}, GSE PDUs: {}, GSE PDUs dropped by label: {}, TUN errors: {}",
243                stats.bbframes,
244                stats.bbframe_errors,
245                stats.gse_pdus,
246                stats.gse_pdus_dropped_by_label,
247                stats.tun_errors
248            );
249        }
250        std::thread::sleep(interval);
251    }
252}
253
254/// Main function of the CLI application.
255pub fn main() -> Result<()> {
256    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
257    let args = Args::parse();
258    let mut tun = tun_tap::Iface::without_packet_info(&args.tun, tun_tap::Mode::Tun)
259        .context("failed to open TUN device")?;
260    log::info!("dvb-gse v{} started", env!("CARGO_PKG_VERSION"));
261    let stats = Arc::new(Mutex::new(Stats::default()));
262    if args.stats_interval != 0.0 {
263        std::thread::spawn({
264            let stats = Arc::clone(&stats);
265            move || {
266                report_stats(&stats, Duration::from_secs_f64(args.stats_interval));
267            }
268        });
269    }
270    match args.input {
271        InputFormat::UdpFragments | InputFormat::UdpComplete => {
272            let gsepacket_defrag = gsepacket_defragmenter(&args);
273            let socket = UdpSocket::bind(args.listen).context("failed to bind to UDP socket")?;
274            setup_multicast(&socket, &args.listen)?;
275            let allow_settings = AllowSettings::from(&args);
276            match args.input {
277                InputFormat::UdpFragments => {
278                    let mut bbframe_recv = BBFrameDefrag::new(socket);
279                    bbframe_recv.set_isi(args.isi);
280                    bbframe_recv.set_header_bytes(args.header_length)?;
281                    let mut app = AppLoop {
282                        bbframe_recv,
283                        gsepacket_defrag,
284                        tun,
285                        bbframe_recv_errors_fatal: true,
286                        stats,
287                        allow_settings,
288                    };
289                    app.app_loop()?;
290                }
291                InputFormat::UdpComplete => {
292                    let mut bbframe_recv = BBFrameRecv::new(socket);
293                    bbframe_recv.set_isi(args.isi);
294                    bbframe_recv.set_header_bytes(args.header_length)?;
295                    let mut app = AppLoop {
296                        bbframe_recv,
297                        gsepacket_defrag,
298                        tun,
299                        bbframe_recv_errors_fatal: false,
300                        stats,
301                        allow_settings,
302                    };
303                    app.app_loop()?;
304                }
305                _ => unreachable!(),
306            }
307        }
308        InputFormat::Tcp => {
309            let listener =
310                TcpListener::bind(args.listen).context("failed to bind to TCP socket")?;
311            // For TCP, the application runs each TCP connection in a dedicated
312            // thread. There is another thread that owns the TUN. The TCP
313            // connection threads are connected to the TUN thread by an mpsc
314            // channel.
315            let channel_capacity = 64;
316            let (tun_tx, tun_rx) = mpsc::sync_channel(channel_capacity);
317            let allow_settings = AllowSettings::from(&args);
318            thread::spawn({
319                let stats = Arc::clone(&stats);
320                move || {
321                    for pdu in tun_rx.iter() {
322                        write_pdu_tun(&pdu, &mut tun, &mut stats.lock().unwrap(), &allow_settings);
323                    }
324                }
325            });
326            // use thread scope to pass args by reference
327            thread::scope(|s| {
328                for stream in listener.incoming() {
329                    let stream = match stream {
330                        Ok(s) => s,
331                        Err(e) => {
332                            log::error!("connection error {e}");
333                            continue;
334                        }
335                    };
336                    match stream.peer_addr() {
337                        Ok(addr) => log::info!("TCP client connected from {addr}"),
338                        Err(err) => log::error!(
339                            "TCP client connected (but could not retrieve peer address): {err}"
340                        ),
341                    }
342                    s.spawn({
343                        let args = &args;
344                        let tun_tx = tun_tx.clone();
345                        let stats = &stats;
346                        move || {
347                            let mut gsepacket_defrag = gsepacket_defragmenter(args);
348                            let mut bbframe_recv = BBFrameStream::new(stream);
349                            bbframe_recv.set_isi(args.isi);
350                            if let Err(err) = bbframe_recv.set_header_bytes(args.header_length) {
351                                eprintln!("could not set header length: {err}");
352                                std::process::exit(1);
353                            }
354                            loop {
355                                let bbframe = bbframe_recv.get_bbframe();
356                                let bbframe = {
357                                    let mut stats = stats.lock().unwrap();
358                                    match bbframe {
359                                        Ok(b) => {
360                                            stats.bbframes += 1;
361                                            b
362                                        }
363                                        Err(err) => {
364                                            log::error!("failed to receive BBFRAME; terminating connection: {err}");
365                                            stats.bbframe_errors += 1;
366                                            return;
367                                        }
368                                    }
369                                };
370                                // the BBFRAME was validated by bbframe_recv, so we can unwrap here
371                                for pdu in gsepacket_defrag.defragment(&bbframe).unwrap() {
372                                    tun_tx.send(pdu).unwrap();
373                                }
374                            }
375                        }
376                    });
377                }
378            });
379        }
380    }
381    Ok(())
382}