Skip to main content

twinleaf_tools/tools/
proxy.rs

1//! tio proxy
2//!
3//! Multiplexes access to one or more sensors, exposing the functionality of
4//! tio::proxy via TCP. With `--mount`, each sensor hangs off a route prefix
5//! and the proxy presents the set as a single virtual hub.
6
7use crate::{MountArg, ProxyCli, ProxySubcommands};
8use std::io;
9use std::net::TcpListener;
10use std::time::Duration;
11use twinleaf::device::discovery::{self, PortInterface};
12use twinleaf::tio::{self, proto, proxy};
13
14fn init_proxy_logging(verbose: bool, debug: bool) {
15    use std::io::Write;
16    let level_filter = if debug {
17        "trace"
18    } else if verbose {
19        "debug"
20    } else {
21        "info,device=debug"
22    };
23    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(level_filter))
24        .format(|buf, record| {
25            let level = record.level();
26            let level_style = buf.default_level_style(level);
27            let target = record.target();
28            let source = target
29                .strip_prefix("device::")
30                .unwrap_or_else(|| target.rsplit("::").next().unwrap_or(target));
31            let bold = env_logger::fmt::style::Style::new().bold();
32            let ts = chrono::Local::now().format("%T%.3f");
33            writeln!(
34                buf,
35                "{ts} {level_style}{level:5}{level_style:#} {bold}{source}:{bold:#} {}",
36                record.args()
37            )
38        })
39        .init();
40}
41
42pub fn run_proxy(mut proxy_cli: ProxyCli) -> eyre::Result<()> {
43    match proxy_cli.subcommands.take() {
44        Some(ProxySubcommands::Nmea { tio, tcp_port }) => {
45            init_proxy_logging(false, false);
46            crate::tools::proxy_nmea::run_nmea_proxy(tio, tcp_port)
47        }
48        None => {
49            init_proxy_logging(proxy_cli.verbose, proxy_cli.debug);
50            if proxy_cli.timestamp_format != "%T%.3f " {
51                log::warn!(
52                    "--timestamp is deprecated and no longer applied; \
53                     timestamps are emitted by the logger"
54                );
55            }
56            if proxy_cli.enumerate {
57                return crate::tools::list::list_devices_deprecated(true);
58            }
59            if proxy_cli.auto {
60                log::warn!(
61                    "'--auto' is deprecated; running without a URL now auto-detects by default"
62                );
63            }
64            let mounts = std::mem::take(&mut proxy_cli.mounts);
65            let layout = Layout::from_cli(mounts, proxy_cli.sensor_url.take())?;
66            let server = ProxyServer {
67                config: ProxyConfig::from(&proxy_cli),
68                layout,
69            };
70            server.run()
71        }
72    }
73}
74
75/// Server settings, fixed at startup.
76#[derive(Debug, Clone)]
77struct ProxyConfig {
78    tcp_port: u16,
79    reconnect_timeout: Duration,
80    disconnect_slow: bool,
81    verbose: bool,
82    debugging: bool,
83    subtree: proto::DeviceRoute,
84    dump_traffic: bool,
85    dump_data: bool,
86    dump_meta: bool,
87    dump_hb: bool,
88}
89
90impl From<&ProxyCli> for ProxyConfig {
91    fn from(cli: &ProxyCli) -> Self {
92        Self {
93            tcp_port: cli.port,
94            reconnect_timeout: Duration::from_secs(cli.reconnect_timeout),
95            disconnect_slow: cli.kick_slow,
96            verbose: cli.verbose,
97            debugging: cli.debug,
98            subtree: cli.subtree.clone(),
99            dump_traffic: cli.dump,
100            dump_data: cli.dump_data,
101            dump_meta: cli.dump_meta,
102            dump_hb: cli.dump_hb,
103        }
104    }
105}
106
107/// One upstream sensor bound to a route prefix. A single device without
108/// `--mount` sits at the root prefix.
109#[derive(Debug, Clone)]
110struct Mount {
111    locator: String,
112    prefix: proto::DeviceRoute,
113    auto_detected: bool,
114}
115
116#[derive(Debug, Clone)]
117struct Layout {
118    mounts: Vec<Mount>,
119}
120
121impl Layout {
122    fn from_cli(mount_args: Vec<MountArg>, sensor_url: Option<String>) -> eyre::Result<Layout> {
123        if mount_args.is_empty() {
124            return Ok(Layout {
125                mounts: vec![resolve_root_mount(sensor_url)?],
126            });
127        }
128        let mut prefixes = std::collections::HashSet::new();
129        for arg in &mount_args {
130            if !prefixes.insert(arg.prefix.clone()) {
131                return Err(eyre::eyre!("duplicate mount prefix {}", arg.prefix));
132            }
133        }
134        Ok(Layout {
135            mounts: mount_args
136                .into_iter()
137                .map(|arg| Mount {
138                    locator: arg.locator,
139                    prefix: arg.prefix,
140                    auto_detected: false,
141                })
142                .collect(),
143        })
144    }
145}
146
147/// Resolve the sensor URL into a root-prefix `Mount`, auto-detecting if no
148/// URL was given.
149fn resolve_root_mount(sensor_url: Option<String>) -> eyre::Result<Mount> {
150    use color_eyre::Help;
151
152    let auto_detected = sensor_url.is_none();
153    let locator = if let Some(url) = sensor_url {
154        url
155    } else {
156        let devices = discovery::enumerate_serial(false);
157        let mut valid_urls = Vec::new();
158        for dev in devices {
159            match dev.interface {
160                PortInterface::STM32 | PortInterface::FTDI => {
161                    valid_urls.push(dev.url.clone());
162                }
163                _ => {}
164            }
165        }
166        if valid_urls.is_empty() {
167            return Err(eyre::eyre!("no sensors detected")
168                .suggestion("specify a URL with -s <url>, or run 'tio list'"));
169        }
170        if valid_urls.len() > 1 {
171            eprintln!("multiple sensors detected:");
172            let query_timeout = Duration::from_millis(500);
173            for url in &valid_urls {
174                match discovery::query_name(url, query_timeout) {
175                    Some(name) => eprintln!("  {}  {}", url, name),
176                    None => eprintln!("  {}  (no response)", url),
177                }
178            }
179            return Err(eyre::eyre!("multiple sensors detected, cannot auto-select")
180                .suggestion("specify one with -s <url>")
181                .suggestion("or mount each at a route prefix with --mount <url>=/N"));
182        }
183        valid_urls.swap_remove(0)
184    };
185
186    Ok(Mount {
187        locator,
188        prefix: proto::DeviceRoute::root(),
189        auto_detected,
190    })
191}
192
193/// A mounted device's live connection: the proxy interface, its status
194/// events, and the server's own monitoring port on it.
195struct DeviceLink {
196    prefix: proto::DeviceRoute,
197    interface: proxy::Interface,
198    status_rx: crossbeam::channel::Receiver<proxy::Event>,
199    monitor_port: proxy::Port,
200}
201
202/// What a ready `Select` slot in the server loop corresponds to, recorded
203/// at registration so readiness never has to be decoded from index math.
204#[derive(Clone, Copy)]
205enum Source<'a> {
206    NewClient,
207    Status(&'a DeviceLink),
208    DevicePacket(&'a DeviceLink),
209}
210
211/// Why a client's forwarding loop ended.
212enum Disconnect {
213    ClientClosed,
214    TooSlow,
215    PortReceiveFailed,
216    PortForwardFailed,
217}
218
219fn is_rpc(payload: &proto::Payload) -> bool {
220    matches!(
221        payload,
222        proto::Payload::RpcRequest(_) | proto::Payload::RpcReply(_) | proto::Payload::RpcError(_)
223    )
224}
225
226/// Tracks a client that isn't keeping up, so the drop and the recovery are
227/// each reported once rather than per packet.
228#[derive(Default)]
229struct SlowTracker {
230    is_slow: bool,
231    dropped: usize,
232}
233
234impl SlowTracker {
235    fn packet_dropped(&mut self, addr: &str) {
236        if !log::log_enabled!(log::Level::Debug) {
237            return;
238        }
239        if !self.is_slow {
240            self.is_slow = true;
241            log::debug!("Client {} is not keeping up and is dropping packets", addr);
242        }
243        self.dropped += 1;
244    }
245
246    fn packet_delivered(&mut self, addr: &str) {
247        if self.is_slow {
248            log::debug!(
249                "Client {} resuming after having dropped {} packets",
250                addr,
251                self.dropped
252            );
253            self.is_slow = false;
254            self.dropped = 0;
255        }
256    }
257}
258
259/// The server fronting the TCP port: fans the mounted devices' traffic out
260/// to TCP clients.
261struct ProxyServer {
262    config: ProxyConfig,
263    layout: Layout,
264}
265
266impl ProxyServer {
267    fn run(self) -> eyre::Result<()> {
268        use color_eyre::{Help, SectionExt};
269        use eyre::bail;
270
271        self.print_startup();
272
273        let new_client = self.start_listeners()?;
274
275        let mut links = Vec::with_capacity(self.layout.mounts.len());
276        for mount in &self.layout.mounts {
277            let (status_send, status_rx) = crossbeam::channel::bounded::<proxy::Event>(100);
278            let interface = proxy::Interface::new_proxy(
279                &mount.locator,
280                Some(self.config.reconnect_timeout),
281                Some(status_send),
282            );
283            // This is used by the proxy itself to communicate with the device
284            // tree, for now only to receive log messages and dump traffic.
285            let monitor_port = match interface.subtree_full(self.config.subtree.clone()) {
286                Ok(port) => port,
287                Err(e) => {
288                    let last_status = status_rx.iter().last();
289                    let err = eyre::Report::new(e)
290                        .wrap_err(format!("could not open port on {}", mount.locator));
291                    return Err(if let Some(status) = last_status {
292                        err.with_section(move || {
293                            format!("{:?}", status).header("Last proxy event:")
294                        })
295                    } else {
296                        err
297                    });
298                }
299            };
300            links.push(DeviceLink {
301                prefix: mount.prefix.clone(),
302                interface,
303                status_rx,
304                monitor_port,
305            });
306        }
307
308        let mut sel = crossbeam::channel::Select::new();
309        let mut sources = Vec::with_capacity(1 + 2 * links.len());
310        sel.recv(&new_client);
311        sources.push(Source::NewClient);
312        for link in &links {
313            sel.recv(&link.status_rx);
314            sources.push(Source::Status(link));
315            sel.recv(link.monitor_port.receiver());
316            sources.push(Source::DevicePacket(link));
317        }
318
319        loop {
320            let oper = sel.select();
321            match sources[oper.index()] {
322                Source::NewClient => {
323                    let Ok(stream) = oper.recv(&new_client) else {
324                        bail!("listener thread died unexpectedly");
325                    };
326                    self.accept_client(stream, &links);
327                }
328                Source::Status(link) => {
329                    let Ok(evt) = oper.recv(&link.status_rx) else {
330                        // The proxy thread died, most likely due to the sensor
331                        // getting disconnected past the autoreconnection
332                        break;
333                    };
334                    log_proxy_event(evt, &link.prefix);
335                }
336                Source::DevicePacket(link) => {
337                    let Ok(pkt) = oper.recv(link.monitor_port.receiver()) else {
338                        break;
339                    };
340                    self.log_device_packet(pkt, &link.prefix);
341                }
342            }
343        }
344        Ok(())
345    }
346
347    fn print_startup(&self) {
348        println!("tio proxy starting:");
349        let mounts = &self.layout.mounts;
350        if mounts.len() == 1 && mounts[0].prefix.len() == 0 {
351            println!(
352                "  Sensor: {}{}",
353                mounts[0].locator,
354                if mounts[0].auto_detected {
355                    " (auto-detected)"
356                } else {
357                    ""
358                }
359            );
360        } else {
361            println!("  Mounts:");
362            for mount in mounts {
363                println!("    {}  {}", mount.prefix, mount.locator);
364            }
365        }
366        println!("  TCP port: {}", self.config.tcp_port);
367        println!("  Subtree: {}", self.config.subtree);
368
369        let flags = [
370            ("verbose", self.config.verbose),
371            ("debug", self.config.debugging),
372            ("kick-slow", self.config.disconnect_slow),
373            ("dump", self.config.dump_traffic),
374            ("dump-data", self.config.dump_data),
375            ("dump-meta", self.config.dump_meta),
376            ("dump-hb", self.config.dump_hb),
377        ];
378        let enabled: Vec<&str> = flags
379            .iter()
380            .filter_map(|&(name, on)| on.then_some(name))
381            .collect();
382        if !enabled.is_empty() {
383            println!("  Flags: {}", enabled.join(" "));
384        }
385        println!();
386    }
387
388    fn start_listeners(&self) -> eyre::Result<crossbeam::channel::Receiver<std::net::TcpStream>> {
389        use color_eyre::Help;
390
391        let (client_send, new_client) = crossbeam::channel::bounded::<std::net::TcpStream>(10);
392        let started_v6 = create_listener_thread(
393            std::net::SocketAddr::new(
394                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
395                self.config.tcp_port,
396            ),
397            client_send.clone(),
398        );
399        let started_v4 = if let (Ok(()), false) = (&started_v6, cfg!(windows)) {
400            // If v6 started correctly and we are not in windows, pretend
401            // v4 also started correctly. The OS will pass the new clients
402            // through the v6 socket.
403            Ok(())
404        } else {
405            create_listener_thread(
406                std::net::SocketAddr::new(
407                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
408                    self.config.tcp_port,
409                ),
410                client_send.clone(),
411            )
412        };
413        if let (Err(e1), Err(e2)) = (started_v6, started_v4) {
414            let addr_in_use = matches!(e1.kind(), io::ErrorKind::AddrInUse)
415                || matches!(e2.kind(), io::ErrorKind::AddrInUse);
416            let err = eyre::eyre!(
417                "could not bind TCP port {}: v6={}, v4={}",
418                self.config.tcp_port,
419                e1,
420                e2
421            );
422            return Err(if addr_in_use {
423                err.suggestion(format!(
424                    "another 'tio proxy' is likely running on port {}; try --port <N>",
425                    self.config.tcp_port
426                ))
427            } else {
428                err
429            });
430        }
431        Ok(new_client)
432    }
433
434    fn accept_client(&self, stream: std::net::TcpStream, links: &[DeviceLink]) {
435        let addr = match stream.peer_addr() {
436            Ok(addr) => addr.to_string(),
437            Err(err) => {
438                log::warn!("Failed to determine client address: {:?}", err);
439                return;
440            }
441        };
442        // A client from the proxy perspective is a port in reverse, i.e. what it receives
443        // is what a client transmits, and vice-versa. Therefore, the channel size settings
444        // for rx and tx are inverted. Also, we use the proxy port channel size setting
445        // instead of the physical ports setting.
446        let (rx_send, client_rx) =
447            tio::port::Port::rx_channel_custom(proxy::Interface::get_client_tx_channel_size());
448        let client = match tio::port::Port::from_tcp_stream_custom(
449            stream,
450            tio::port::Port::rx_to_channel(rx_send),
451            proxy::Interface::get_client_rx_channel_size(),
452        ) {
453            Ok(client_port) => client_port,
454            _ => return,
455        };
456
457        log::debug!("Accepted client from {}", addr);
458        let mut ports = Vec::with_capacity(links.len());
459        for link in links {
460            let port = link
461                .interface
462                .new_port(
463                    Some(Duration::from_millis(2000)),
464                    self.config.subtree.clone(),
465                    usize::MAX,
466                    true,
467                    true,
468                )
469                .expect("Failed to create new proxy port");
470            ports.push((link.prefix.clone(), port));
471        }
472
473        let dump_traffic = self.config.dump_traffic;
474        let disconnect_slow = self.config.disconnect_slow;
475        std::thread::spawn(move || {
476            let mut slow = SlowTracker::default();
477
478            // Slot 0 is the client's own traffic; slot 1 + i is ports[i].
479            let mut sel = crossbeam::channel::Select::new();
480            sel.recv(&client_rx);
481            for (_, port) in &ports {
482                sel.recv(port.receiver());
483            }
484
485            let reason = loop {
486                let oper = sel.select();
487                match oper.index() {
488                    0 => {
489                        let Ok(Ok(mut pkt)) = oper.recv(&client_rx) else {
490                            break Disconnect::ClientClosed;
491                        };
492                        if dump_traffic {
493                            log::info!("{}->{} -- {:?}", addr, pkt.routing, pkt.payload);
494                        }
495                        let mut dest = None;
496                        for (prefix, port) in &ports {
497                            if let Ok(relative) = prefix.relative_route(&pkt.routing) {
498                                dest = Some((relative, port));
499                                break;
500                            }
501                        }
502                        let Some((relative, port)) = dest else {
503                            log::debug!(
504                                "Client {} addressed unmounted route {}",
505                                addr,
506                                pkt.routing
507                            );
508                            continue;
509                        };
510                        pkt.routing = relative;
511                        if port.try_send(pkt).is_err() {
512                            break Disconnect::PortForwardFailed;
513                        }
514                    }
515                    i => {
516                        let (prefix, port) = &ports[i - 1];
517                        let Ok(mut pkt) = oper.recv(port.receiver()) else {
518                            break Disconnect::PortReceiveFailed;
519                        };
520                        pkt.routing = prefix.absolute_route(&pkt.routing);
521                        if pkt.routing.len() > proto::TIO_PACKET_MAX_ROUTING_SIZE {
522                            log::warn!(
523                                "Dropping packet for client {}: route {} exceeds max depth",
524                                addr,
525                                pkt.routing
526                            );
527                            continue;
528                        }
529                        if dump_traffic && is_rpc(&pkt.payload) {
530                            log::info!("{}->{} -- {:?}", pkt.routing, addr, pkt.payload);
531                        }
532                        match client.try_send(pkt) {
533                            Ok(()) => slow.packet_delivered(&addr),
534                            Err(tio::SendError::Full) if !disconnect_slow => {
535                                slow.packet_dropped(&addr)
536                            }
537                            Err(tio::SendError::Full) => break Disconnect::TooSlow,
538                            Err(_) => break Disconnect::ClientClosed,
539                        }
540                    }
541                }
542            };
543
544            match reason {
545                Disconnect::ClientClosed => log::debug!("Client {} exiting", addr),
546                Disconnect::TooSlow => {
547                    log::warn!("Disconnecting client {} due to slowness", addr)
548                }
549                Disconnect::PortReceiveFailed => log::warn!(
550                    "Disconnecting client {} due to internal error receiving tio data in thread",
551                    addr
552                ),
553                Disconnect::PortForwardFailed => log::warn!(
554                    "Disconnecting client {} due to internal error forwarding tio data in thread",
555                    addr
556                ),
557            }
558        });
559    }
560
561    fn log_device_packet(&self, mut pkt: proto::Packet, prefix: &proto::DeviceRoute) {
562        pkt.routing = prefix.absolute_route(&pkt.routing);
563        let dump = match pkt.payload {
564            proto::Payload::Heartbeat(_) => self.config.dump_hb,
565            proto::Payload::Metadata(_) => self.config.dump_meta,
566            proto::Payload::StreamData(_) => self.config.dump_data,
567            _ => self.config.dump_traffic,
568        };
569        if dump {
570            log::info!("Packet from {} -- {:?}", pkt.routing, pkt.payload);
571        }
572        if let proto::Payload::LogMessage(log_msg) = pkt.payload {
573            // Map the device-reported level onto the log crate's level
574            // so the logger filter and prefix reflect it.
575            let level = match &log_msg.level {
576                proto::LogLevel::Critical | proto::LogLevel::Error => log::Level::Error,
577                proto::LogLevel::Warning => log::Level::Warn,
578                proto::LogLevel::Info => log::Level::Info,
579                proto::LogLevel::Debug => log::Level::Debug,
580                proto::LogLevel::Unknown(_) => log::Level::Info,
581            };
582            log::log!(target: &format!("device::{}", pkt.routing), level, "{}", log_msg.message);
583        }
584    }
585}
586
587fn create_listener_thread(
588    addr: std::net::SocketAddr,
589    client_send: crossbeam::channel::Sender<std::net::TcpStream>,
590) -> io::Result<()> {
591    let listener = TcpListener::bind(addr)?;
592    std::thread::Builder::new()
593        .name("listener".to_string())
594        .spawn(move || {
595            for res in listener.incoming() {
596                match res {
597                    Ok(stream) => client_send.send(stream).expect("New client queue full"),
598                    Err(err) => eprintln!("error accepting client: {}", err),
599                };
600            }
601        })?;
602    Ok(())
603}
604
605fn log_proxy_event(evt: proxy::Event, prefix: &proto::DeviceRoute) {
606    let target = format!("proxy::{}", prefix);
607    let target = target.as_str();
608    match evt {
609        proxy::Event::SensorDisconnected => {
610            log::warn!(target: target, "Sensor disconnected");
611        }
612        proxy::Event::SensorReconnected => {
613            log::info!(target: target, "Sensor reconnected");
614        }
615        proxy::Event::FailedToReconnect => {
616            log::error!(target: target, "Stopping reconnection attempts due to timeout");
617        }
618        proxy::Event::FailedToConnect => {
619            log::error!(target: target, "Fatal proxy error: failed to connect to sensor");
620        }
621        proxy::Event::FatalError(err) => {
622            log::error!(target: target, "Fatal proxy error: {:?}", err);
623            // the proxy thread will exit and we'll detect it at the next iteration.
624        }
625        proxy::Event::ProtocolError(perr) => match perr {
626            proto::Error::Text(txt) => {
627                log::info!(target: target, "Text: {}", txt);
628            }
629            other => {
630                log::debug!(target: target, "Protocol error: {:?}", other);
631            }
632        },
633        evt => {
634            log::trace!(target: target, "{:?}", evt);
635        }
636    }
637}