dvb_gse/
cli.rs

1//! CLI application.
2//!
3//! This module implements a CLI application that receives UDP or TCP packets
4//! containing BBFRAMEs from an external DVB-S2 receiver such as
5//! [`Longmynd`](https://github.com/BritishAmateurTelevisionClub/longmynd). It
6//! obtains IP packets from a continous-mode GSE stream, and sends the IP
7//! packets to a TUN device.
8
9use crate::{
10    bbframe::{BBFrameDefrag, BBFrameReceiver, BBFrameRecv, BBFrameStream},
11    gsepacket::{GSEPacketDefrag, PDU},
12};
13use anyhow::{Context, Result};
14use clap::Parser;
15use std::{
16    net::{SocketAddr, TcpListener, UdpSocket},
17    os::unix::io::AsRawFd,
18    sync::{mpsc, Arc, Mutex},
19    thread,
20    time::Duration,
21};
22
23/// Receive DVB-GSE and send PDUs into a TUN device
24#[derive(Parser, Debug)]
25#[command(author, version, about, long_about = None)]
26struct Args {
27    /// IP address and port to listen on to receive DVB-S2 BBFRAMEs
28    #[arg(long)]
29    listen: SocketAddr,
30    /// TUN interface name
31    #[arg(long)]
32    tun: String,
33    /// Input format: "UDP fragments", "UDP complete", or "TCP"
34    #[arg(long, default_value_t)]
35    input: InputFormat,
36    /// Input header length (the header is discarded)
37    #[arg(long, default_value_t = 0)]
38    header_length: usize,
39    /// ISI to process in MIS mode (if this option is not specified, run in SIS mode)
40    #[arg(long)]
41    isi: Option<u8>,
42    /// Time interval used to log statistics (in seconds)
43    #[arg(long, default_value_t = 100.0)]
44    stats_interval: f64,
45    /// Skip checking the GSE total length field
46    #[arg(long)]
47    skip_total_length: bool,
48}
49
50#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Default)]
51enum InputFormat {
52    /// BBFRAME fragments in UDP datagrams.
53    #[default]
54    UdpFragments,
55    /// Complete BBFRAMEs in UDP datagrams.
56    UdpComplete,
57    /// BBFRAMEs in a TCP stream.
58    Tcp,
59}
60
61impl std::str::FromStr for InputFormat {
62    type Err = String;
63
64    fn from_str(s: &str) -> Result<Self, Self::Err> {
65        Ok(match s {
66            "UDP" | "UDP fragments" => InputFormat::UdpFragments,
67            "UDP complete" => InputFormat::UdpComplete,
68            "TCP" => InputFormat::Tcp,
69            _ => return Err(format!("invalid input format {s}")),
70        })
71    }
72}
73
74impl std::fmt::Display for InputFormat {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
76        write!(
77            f,
78            "{}",
79            match self {
80                InputFormat::UdpFragments => "UDP fragments",
81                InputFormat::UdpComplete => "UDP complete",
82                InputFormat::Tcp => "TCP",
83            }
84        )
85    }
86}
87
88fn setup_multicast(socket: &UdpSocket, addr: &SocketAddr) -> Result<()> {
89    match addr.ip() {
90        std::net::IpAddr::V4(addr) if addr.is_multicast() => {
91            set_reuseaddr(socket)?;
92            log::info!("joining multicast address {}", addr);
93            socket.join_multicast_v4(&addr, &std::net::Ipv4Addr::UNSPECIFIED)?;
94        }
95        std::net::IpAddr::V6(addr) if addr.is_multicast() => {
96            set_reuseaddr(socket)?;
97            log::info!("joining multicast address {}", addr);
98            socket.join_multicast_v6(&addr, 0)?;
99        }
100        _ => (),
101    }
102    Ok(())
103}
104
105fn set_reuseaddr(socket: &UdpSocket) -> Result<()> {
106    let optval: libc::c_int = 1;
107    if unsafe {
108        libc::setsockopt(
109            socket.as_raw_fd(),
110            libc::SOL_SOCKET,
111            libc::SO_REUSEADDR,
112            &optval as *const _ as *const libc::c_void,
113            libc::socklen_t::try_from(std::mem::size_of::<libc::c_int>()).unwrap(),
114        )
115    } != 0
116    {
117        let err = std::io::Error::last_os_error();
118        anyhow::bail!("could not set SO_REUSEADDR: {err}")
119    }
120    Ok(())
121}
122
123#[derive(Debug)]
124struct AppLoop<D> {
125    bbframe_recv: D,
126    gsepacket_defrag: GSEPacketDefrag,
127    tun: tun_tap::Iface,
128    bbframe_recv_errors_fatal: bool,
129    stats: Arc<Mutex<Stats>>,
130}
131
132fn write_pdu_tun(pdu: &PDU, tun: &mut tun_tap::Iface, stats: &mut Stats) {
133    if let Err(err) = tun.send(pdu.data()) {
134        log::error!("could not write packet to TUN device: {err}");
135        stats.tun_errors += 1;
136    }
137}
138
139impl<D: BBFrameReceiver> AppLoop<D> {
140    fn app_loop(&mut self) -> Result<()> {
141        loop {
142            let bbframe = self.bbframe_recv.get_bbframe();
143            let mut stats = self.stats.lock().unwrap();
144            let bbframe = match bbframe {
145                Ok(b) => {
146                    stats.bbframes += 1;
147                    b
148                }
149                Err(err) => {
150                    stats.bbframe_errors += 1;
151                    if self.bbframe_recv_errors_fatal {
152                        return Err(err).context("failed to receive BBFRAME");
153                    } else {
154                        continue;
155                    }
156                }
157            };
158            // the BBFRAME was validated by bbframe_recv, so we can unwrap here
159            for pdu in self.gsepacket_defrag.defragment(&bbframe).unwrap() {
160                stats.gse_packets += 1;
161                write_pdu_tun(&pdu, &mut self.tun, &mut stats);
162            }
163            // drop stats mutex lock explicitly, for good measure in case code
164            // is added below
165            drop(stats);
166        }
167    }
168}
169
170fn gsepacket_defragmenter(args: &Args) -> GSEPacketDefrag {
171    let mut defrag = GSEPacketDefrag::new();
172    defrag.set_skip_total_length_check(args.skip_total_length);
173    defrag
174}
175
176#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
177struct Stats {
178    bbframes: u64,
179    bbframe_errors: u64,
180    gse_packets: u64,
181    tun_errors: u64,
182}
183
184fn report_stats(stats: &Mutex<Stats>, interval: Duration) {
185    loop {
186        {
187            let stats = stats.lock().unwrap();
188            log::info!(
189                "BBFRAMES: {}, BBFRAME errors: {}, GSE packets: {}, TUN errors: {}",
190                stats.bbframes,
191                stats.bbframe_errors,
192                stats.gse_packets,
193                stats.tun_errors
194            );
195        }
196        std::thread::sleep(interval);
197    }
198}
199
200/// Main function of the CLI application.
201pub fn main() -> Result<()> {
202    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
203    let args = Args::parse();
204    let mut tun = tun_tap::Iface::without_packet_info(&args.tun, tun_tap::Mode::Tun)
205        .context("failed to open TUN device")?;
206    log::info!("dvb-gse v{} started", env!("CARGO_PKG_VERSION"));
207    let stats = Arc::new(Mutex::new(Stats::default()));
208    if args.stats_interval != 0.0 {
209        std::thread::spawn({
210            let stats = Arc::clone(&stats);
211            move || {
212                report_stats(&stats, Duration::from_secs_f64(args.stats_interval));
213            }
214        });
215    }
216    match args.input {
217        InputFormat::UdpFragments | InputFormat::UdpComplete => {
218            let gsepacket_defrag = gsepacket_defragmenter(&args);
219            let socket = UdpSocket::bind(args.listen).context("failed to bind to UDP socket")?;
220            setup_multicast(&socket, &args.listen)?;
221            match args.input {
222                InputFormat::UdpFragments => {
223                    let mut bbframe_recv = BBFrameDefrag::new(socket);
224                    bbframe_recv.set_isi(args.isi);
225                    bbframe_recv.set_header_bytes(args.header_length)?;
226                    let mut app = AppLoop {
227                        bbframe_recv,
228                        gsepacket_defrag,
229                        tun,
230                        bbframe_recv_errors_fatal: true,
231                        stats,
232                    };
233                    app.app_loop()?;
234                }
235                InputFormat::UdpComplete => {
236                    let mut bbframe_recv = BBFrameRecv::new(socket);
237                    bbframe_recv.set_isi(args.isi);
238                    bbframe_recv.set_header_bytes(args.header_length)?;
239                    let mut app = AppLoop {
240                        bbframe_recv,
241                        gsepacket_defrag,
242                        tun,
243                        bbframe_recv_errors_fatal: false,
244                        stats,
245                    };
246                    app.app_loop()?;
247                }
248                _ => unreachable!(),
249            }
250        }
251        InputFormat::Tcp => {
252            let listener =
253                TcpListener::bind(args.listen).context("failed to bind to TCP socket")?;
254            // For TCP, the application runs each TCP connection in a dedicated
255            // thread. There is another thread that owns the TUN. The TCP
256            // connection threads are connected to the TUN thread by an mpsc
257            // channel.
258            let channel_capacity = 64;
259            let (tun_tx, tun_rx) = mpsc::sync_channel(channel_capacity);
260            thread::spawn({
261                let stats = Arc::clone(&stats);
262                move || {
263                    for pdu in tun_rx.iter() {
264                        write_pdu_tun(&pdu, &mut tun, &mut stats.lock().unwrap());
265                    }
266                }
267            });
268            // use thread scope to pass args by reference
269            thread::scope(|s| {
270                for stream in listener.incoming() {
271                    let stream = match stream {
272                        Ok(s) => s,
273                        Err(e) => {
274                            log::error!("connection error {e}");
275                            continue;
276                        }
277                    };
278                    match stream.peer_addr() {
279                        Ok(addr) => log::info!("TCP client connected from {addr}"),
280                        Err(err) => log::error!(
281                            "TCP client connected (but could not retrieve peer address): {err}"
282                        ),
283                    }
284                    s.spawn({
285                        let args = &args;
286                        let tun_tx = tun_tx.clone();
287                        let stats = &stats;
288                        move || {
289                            let mut gsepacket_defrag = gsepacket_defragmenter(args);
290                            let mut bbframe_recv = BBFrameStream::new(stream);
291                            bbframe_recv.set_isi(args.isi);
292                            if let Err(err) = bbframe_recv.set_header_bytes(args.header_length) {
293                                eprintln!("could not set header length: {err}");
294                                std::process::exit(1);
295                            }
296                            loop {
297                                let bbframe = bbframe_recv.get_bbframe();
298                                let bbframe = {
299                                    let mut stats = stats.lock().unwrap();
300                                    match bbframe {
301                                        Ok(b) => {
302                                            stats.bbframes += 1;
303                                            b
304                                        }
305                                        Err(err) => {
306                                            log::error!("failed to receive BBFRAME; terminating connection: {err}");
307                                            stats.bbframe_errors += 1;
308                                            return;
309                                        }
310                                    }
311                                };
312                                // the BBFRAME was validated by bbframe_recv, so we can unwrap here
313                                for pdu in gsepacket_defrag.defragment(&bbframe).unwrap() {
314                                    tun_tx.send(pdu).unwrap();
315                                }
316                            }
317                        }
318                    });
319                }
320            });
321        }
322    }
323    Ok(())
324}