Skip to main content

twinleaf_tools/tools/
proxy.rs

1//! tio proxy
2//!
3//! Multiplexes access to a sensor, exposing the functionality of tio::proxy
4//! via TCP.
5
6use crate::ProxyCli;
7use std::io;
8use std::net::TcpListener;
9use std::time::Duration;
10use tio::{proto, proxy};
11use twinleaf::device::discovery::{self, PortInterface};
12use twinleaf::tio;
13
14macro_rules! log{
15    ($tf:expr, $msg:expr)=>{
16    {
17        println!("{}{}", chrono::Local::now().format(&$tf), $msg);
18    }
19    };
20    ($tf:expr, $f:expr,$($a:tt)*)=>{
21    {
22        log!($tf, format!($f, $($a)*));
23    }
24    };
25}
26
27fn create_listener_thread(
28    addr: std::net::SocketAddr,
29    client_send: crossbeam::channel::Sender<std::net::TcpStream>,
30) -> io::Result<()> {
31    let listener = TcpListener::bind(addr)?;
32    std::thread::Builder::new()
33        .name("listener".to_string())
34        .spawn(move || {
35            for res in listener.incoming() {
36                match res {
37                    Ok(stream) => client_send.send(stream).expect("New client queue full"),
38                    Err(err) => eprintln!("error accepting client: {}", err),
39                };
40            }
41        })?;
42    Ok(())
43}
44
45pub fn run_proxy(proxy_cli: ProxyCli) -> eyre::Result<()> {
46    use color_eyre::{Help, SectionExt};
47    use eyre::bail;
48
49    // Handle --enum mode (deprecated; now delegates to `tio list`)
50    if proxy_cli.enumerate {
51        return crate::tools::list::list_devices_deprecated(true);
52    }
53
54    if proxy_cli.auto {
55        eprintln!(
56            "warning: '--auto' is deprecated; running without -s <url> now auto-detects by default"
57        );
58    }
59
60    let tcp_port = proxy_cli.port;
61    let reconnect_timeout = Duration::from_secs(proxy_cli.reconnect_timeout);
62    let disconnect_slow = proxy_cli.kick_slow;
63    let verbose = proxy_cli.verbose;
64    let debugging = proxy_cli.debug;
65    let dump_traffic = proxy_cli.dump;
66    let dump_data = proxy_cli.dump_data;
67    let dump_meta = proxy_cli.dump_meta;
68    let dump_hb = proxy_cli.dump_hb;
69    let tf = proxy_cli.timestamp_format;
70
71    // Determine sensor URL; if none given, auto-detect.
72    let auto_detected = proxy_cli.sensor_url.is_none();
73    let sensor_url = if let Some(url) = proxy_cli.sensor_url {
74        url
75    } else {
76        // --auto mode
77        let devices = discovery::enumerate_serial(false);
78        let mut valid_urls = Vec::new();
79        for dev in devices {
80            match dev.interface {
81                PortInterface::STM32 | PortInterface::FTDI => {
82                    valid_urls.push(dev.url.clone());
83                }
84                _ => {}
85            }
86        }
87        if valid_urls.len() == 0 {
88            return Err(eyre::eyre!("no sensors detected")
89                .suggestion("specify a URL with -s <url>, or run 'tio list'"));
90        }
91        if valid_urls.len() > 1 {
92            eprintln!("multiple sensors detected:");
93            let query_timeout = Duration::from_millis(500);
94            for url in &valid_urls {
95                match discovery::query_name(url, query_timeout) {
96                    Some(name) => eprintln!("  {}  {}", url, name),
97                    None => eprintln!("  {}  (no response)", url),
98                }
99            }
100            return Err(eyre::eyre!("multiple sensors detected, cannot auto-select")
101                .suggestion("specify one with -s <url>"));
102        }
103        valid_urls[0].clone()
104    };
105
106    let subtree = proxy_cli.subtree;
107
108    println!("tio proxy starting:");
109    println!(
110        "  Sensor: {} {}",
111        sensor_url,
112        if auto_detected { "(auto-detected)" } else { "" }
113    );
114    println!("  TCP port: {}", tcp_port);
115    println!("  Subtree: {}", subtree);
116    if verbose || debugging || dump_traffic || dump_data || dump_meta || dump_hb {
117        print!("  Flags:");
118        if verbose {
119            print!(" verbose");
120        }
121        if debugging {
122            print!(" debug");
123        }
124        if disconnect_slow {
125            print!(" kick-slow");
126        }
127        if dump_traffic {
128            print!(" dump");
129        }
130        if dump_data {
131            print!(" dump-data");
132        }
133        if dump_meta {
134            print!(" dump-meta");
135        }
136        if dump_hb {
137            print!(" dump-hb");
138        }
139        println!();
140    }
141    println!();
142
143    let new_client = {
144        let (client_send, new_client) = crossbeam::channel::bounded::<std::net::TcpStream>(10);
145        let started_v6 = create_listener_thread(
146            std::net::SocketAddr::new(
147                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
148                tcp_port,
149            ),
150            client_send.clone(),
151        );
152        let started_v4 = if let (Ok(()), false) = (&started_v6, cfg!(windows)) {
153            // If v6 started correctly and we are not in windows, pretend
154            // v4 also started correctly. The OS will pass the new clients
155            // through the v6 socket.
156            Ok(())
157        } else {
158            create_listener_thread(
159                std::net::SocketAddr::new(
160                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
161                    tcp_port,
162                ),
163                client_send.clone(),
164            )
165        };
166        if let (Err(e1), Err(e2)) = (started_v6, started_v4) {
167            let addr_in_use = matches!(e1.kind(), io::ErrorKind::AddrInUse)
168                || matches!(e2.kind(), io::ErrorKind::AddrInUse);
169            let err = eyre::eyre!("could not bind TCP port {}: v6={}, v4={}", tcp_port, e1, e2);
170            return Err(if addr_in_use {
171                err.suggestion(format!(
172                    "another 'tio proxy' is likely running on port {}; try --port <N>",
173                    tcp_port
174                ))
175            } else {
176                err
177            });
178        }
179        new_client
180    };
181
182    let (status_send, port_status) = crossbeam::channel::bounded::<proxy::Event>(100);
183    let proxy =
184        proxy::Interface::new_proxy(&sensor_url, Some(reconnect_timeout), Some(status_send));
185
186    // This is used by the proxy itself to communicate with the device tree.
187    // for now only used to receive log messages and dump traffic.
188    let proxy_port = match proxy.subtree_full(subtree.clone()) {
189        Ok(port) => port,
190        Err(e) => {
191            let last_status = port_status.iter().last();
192            let err =
193                eyre::Report::new(e).wrap_err(format!("could not open port on {}", sensor_url));
194            return Err(if let Some(status) = last_status {
195                err.with_section(move || format!("{:?}", status).header("Last proxy event:"))
196            } else {
197                err
198            });
199        }
200    };
201
202    use crossbeam::select;
203    loop {
204        select! {
205            recv(new_client) -> tcp_client => {
206                if let Ok(stream) = tcp_client {
207                    let addr = match stream.peer_addr() {
208                        Ok(addr) => addr.to_string(),
209                        Err(err) => {
210                            log!(tf, "Failed to determine client address: {:?}", err);
211                            continue;
212                        }
213                    };
214                    // A client from the proxy perspective is a port in reverse, i.e. what it receives
215                    // is what a client transmits, and vice-versa. Therefore, the channel size settings
216                    // for rx and tx are inverted. Also, we use the proxy port channel size setting
217                    // instead of the physical ports setting.
218                    let (rx_send, client_rx) = tio::port::Port::rx_channel_custom(proxy::Interface::get_client_tx_channel_size());
219                    let client = match tio::port::Port::from_tcp_stream_custom(stream, tio::port::Port::rx_to_channel(rx_send), proxy::Interface::get_client_rx_channel_size()) {
220                        Ok(client_port) => client_port,
221                        _ => continue,
222                    };
223
224                    if verbose {
225                        log!(tf, "Accepted client from {}", addr);
226                    }
227                    let port = proxy.new_port(Some(Duration::from_millis(2000)), subtree.clone(), usize::MAX, true, true).expect("Failed to create new proxy port");
228                    let tf = tf.clone();
229                    std::thread::spawn(move || {
230                        let mut is_slow = false;
231                        let mut dropped: usize = 0;
232                        loop {
233                            select! {
234                                recv(port.receiver()) -> res => {
235                                    let pkt = if let Ok(pkt) = res { pkt } else {
236                                        log!(tf, "Disconnecting client {} due to internal error receiving tio data in thread", addr);
237                                            break;
238                                    };
239                                    if dump_traffic {
240                                        if match pkt.payload {
241                                            proto::Payload::RpcRequest(_) | proto::Payload::RpcReply(_) | proto::Payload::RpcError(_) => true,
242                                            _ => false,
243                                        } {
244                                            log!(tf, "{}->{} -- {:?}", pkt.routing, addr, pkt.payload);
245                                        }
246                                    }
247                                    match client.try_send(pkt) {
248                                        Err(tio::SendError::Full) => {
249                                            if disconnect_slow {
250                                                log!(tf, "Disconnecting client {} due to slowness", addr);
251                                                break;
252                                            } else if verbose {
253                                                if !is_slow {
254                                                    is_slow = true;
255                                                    log!(tf, "Client {} is not keeping up and is dropping packets", addr);
256                                                }
257                                                dropped += 1;
258                                            }
259                                        }
260                                        Ok(()) => {
261                                            if verbose && is_slow {
262                                                log!(tf, "Client {} resuming after having dropped {} packets", addr, dropped);
263                                                is_slow = false;
264                                                dropped = 0;
265                                            }
266                                        }
267                                        _ => {
268                                            if verbose {
269                                                log!(tf, "Client {} exiting", addr);
270                                            }
271                                            break;
272                                        }
273                                    }
274                                }
275                                recv(client_rx) -> res => {
276                                    match res {
277                                        Ok(Ok(pkt)) => {
278                                            if dump_traffic {
279                                                log!(tf, "{}->{} -- {:?}", addr, pkt.routing, pkt.payload);
280                                            }
281                                            if let Err(_) = port.try_send(pkt) {
282                                                log!(tf, "Disconnecting client {} due to internal error forwarding tio data in thread", addr);
283                                                    break;
284                                            }
285                                        }
286                                        _ => {
287                                            if verbose {
288                                                log!(tf, "Client {} exiting", addr);
289                                            }
290                                            break;
291                                        }
292                                    }
293                                }
294                            }
295                        }
296                    });
297                } else {
298                    bail!("listener thread died unexpectedly");
299                }
300            }
301            recv(port_status) -> status => {
302                if let Ok(evt) = status {
303                    match evt {
304                        proxy::Event::SensorDisconnected => {
305                            log!(tf, "Sensor disconnected");
306                        }
307                        proxy::Event::SensorReconnected => {
308                            log!(tf, "Sensor reconnected");
309                        }
310                        proxy::Event::FailedToReconnect => {
311                            log!(tf, "Stopping reconnection attempts due to timeout");
312                        }
313                        proxy::Event::FailedToConnect => {
314                            log!(tf, "Fatal proxy error: failed to connect to sensor");
315                        }
316                        proxy::Event::FatalError(err) => {
317                            log!(tf, "Fatal proxy error: {:?}", err);
318                            // the proxy thread will exit and we'll detect it at the next iteration.
319                        }
320                        proxy::Event::ProtocolError(perr) => {
321                            match perr {
322                                proto::Error::Text(txt) => {
323                                    log!(tf, "Text: {}", txt);
324                                }
325                                other => {
326                                    if verbose || debugging {
327                                        log!(tf, "Protocol error: {:?}", other);
328                                    }
329                                }
330                            }
331                        }
332                        evt => {
333                            if debugging {
334                                log!(tf, "Proxy event: {:?}", evt)
335                            }
336                        }
337                    }
338                } else {
339                    // The proxy thread died, most likely due to the sensor
340                    // getting disconnected past the autoreconnection
341                    break;
342                }
343            }
344            recv(proxy_port.receiver()) -> pkt_or_err => {
345                if let Ok(pkt) = pkt_or_err {
346                    let dump = match pkt.payload {
347                        proto::Payload::Heartbeat(_) => dump_hb,
348                        proto::Payload::Metadata(_) => dump_meta,
349                        proto::Payload::StreamData(_) => dump_data,
350                        _ => dump_traffic
351                    };
352                    if dump {
353                        log!(tf, "Packet from {} -- {:?}", pkt.routing, pkt.payload);
354                    }
355                    if let proto::Payload::LogMessage(log) = pkt.payload {
356                        log!(tf, "{} {:?}: {}", pkt.routing, log.level, log.message);
357                    }
358                }
359            }
360        }
361    }
362    Ok(())
363}