1use crate::ProxyCli;
7use std::io;
8use std::net::TcpListener;
9use std::time::Duration;
10use tio::{proto, proxy};
11use twinleaf::device::discovery::{self, PortInterface};
12use twinleaf::tio;
13
14macro_rules! log{
15 ($tf:expr, $msg:expr)=>{
16 {
17 println!("{}{}", chrono::Local::now().format(&$tf), $msg);
18 }
19 };
20 ($tf:expr, $f:expr,$($a:tt)*)=>{
21 {
22 log!($tf, format!($f, $($a)*));
23 }
24 };
25}
26
27fn create_listener_thread(
28 addr: std::net::SocketAddr,
29 client_send: crossbeam::channel::Sender<std::net::TcpStream>,
30) -> io::Result<()> {
31 let listener = TcpListener::bind(addr)?;
32 std::thread::Builder::new()
33 .name("listener".to_string())
34 .spawn(move || {
35 for res in listener.incoming() {
36 match res {
37 Ok(stream) => client_send.send(stream).expect("New client queue full"),
38 Err(err) => eprintln!("error accepting client: {}", err),
39 };
40 }
41 })?;
42 Ok(())
43}
44
45pub fn run_proxy(proxy_cli: ProxyCli) -> eyre::Result<()> {
46 use color_eyre::{Help, SectionExt};
47 use eyre::bail;
48
49 if proxy_cli.enumerate {
51 return crate::tools::list::list_devices_deprecated(true);
52 }
53
54 if proxy_cli.auto {
55 eprintln!(
56 "warning: '--auto' is deprecated; running without -s <url> now auto-detects by default"
57 );
58 }
59
60 let tcp_port = proxy_cli.port;
61 let reconnect_timeout = Duration::from_secs(proxy_cli.reconnect_timeout);
62 let disconnect_slow = proxy_cli.kick_slow;
63 let verbose = proxy_cli.verbose;
64 let debugging = proxy_cli.debug;
65 let dump_traffic = proxy_cli.dump;
66 let dump_data = proxy_cli.dump_data;
67 let dump_meta = proxy_cli.dump_meta;
68 let dump_hb = proxy_cli.dump_hb;
69 let tf = proxy_cli.timestamp_format;
70
71 let auto_detected = proxy_cli.sensor_url.is_none();
73 let sensor_url = if let Some(url) = proxy_cli.sensor_url {
74 url
75 } else {
76 let devices = discovery::enumerate_serial(false);
78 let mut valid_urls = Vec::new();
79 for dev in devices {
80 match dev.interface {
81 PortInterface::STM32 | PortInterface::FTDI => {
82 valid_urls.push(dev.url.clone());
83 }
84 _ => {}
85 }
86 }
87 if valid_urls.len() == 0 {
88 return Err(eyre::eyre!("no sensors detected")
89 .suggestion("specify a URL with -s <url>, or run 'tio list'"));
90 }
91 if valid_urls.len() > 1 {
92 eprintln!("multiple sensors detected:");
93 let query_timeout = Duration::from_millis(500);
94 for url in &valid_urls {
95 match discovery::query_name(url, query_timeout) {
96 Some(name) => eprintln!(" {} {}", url, name),
97 None => eprintln!(" {} (no response)", url),
98 }
99 }
100 return Err(eyre::eyre!("multiple sensors detected, cannot auto-select")
101 .suggestion("specify one with -s <url>"));
102 }
103 valid_urls[0].clone()
104 };
105
106 let subtree = proxy_cli.subtree;
107
108 println!("tio proxy starting:");
109 println!(
110 " Sensor: {} {}",
111 sensor_url,
112 if auto_detected { "(auto-detected)" } else { "" }
113 );
114 println!(" TCP port: {}", tcp_port);
115 println!(" Subtree: {}", subtree);
116 if verbose || debugging || dump_traffic || dump_data || dump_meta || dump_hb {
117 print!(" Flags:");
118 if verbose {
119 print!(" verbose");
120 }
121 if debugging {
122 print!(" debug");
123 }
124 if disconnect_slow {
125 print!(" kick-slow");
126 }
127 if dump_traffic {
128 print!(" dump");
129 }
130 if dump_data {
131 print!(" dump-data");
132 }
133 if dump_meta {
134 print!(" dump-meta");
135 }
136 if dump_hb {
137 print!(" dump-hb");
138 }
139 println!();
140 }
141 println!();
142
143 let new_client = {
144 let (client_send, new_client) = crossbeam::channel::bounded::<std::net::TcpStream>(10);
145 let started_v6 = create_listener_thread(
146 std::net::SocketAddr::new(
147 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
148 tcp_port,
149 ),
150 client_send.clone(),
151 );
152 let started_v4 = if let (Ok(()), false) = (&started_v6, cfg!(windows)) {
153 Ok(())
157 } else {
158 create_listener_thread(
159 std::net::SocketAddr::new(
160 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
161 tcp_port,
162 ),
163 client_send.clone(),
164 )
165 };
166 if let (Err(e1), Err(e2)) = (started_v6, started_v4) {
167 let addr_in_use = matches!(e1.kind(), io::ErrorKind::AddrInUse)
168 || matches!(e2.kind(), io::ErrorKind::AddrInUse);
169 let err = eyre::eyre!("could not bind TCP port {}: v6={}, v4={}", tcp_port, e1, e2);
170 return Err(if addr_in_use {
171 err.suggestion(format!(
172 "another 'tio proxy' is likely running on port {}; try --port <N>",
173 tcp_port
174 ))
175 } else {
176 err
177 });
178 }
179 new_client
180 };
181
182 let (status_send, port_status) = crossbeam::channel::bounded::<proxy::Event>(100);
183 let proxy =
184 proxy::Interface::new_proxy(&sensor_url, Some(reconnect_timeout), Some(status_send));
185
186 let proxy_port = match proxy.subtree_full(subtree.clone()) {
189 Ok(port) => port,
190 Err(e) => {
191 let last_status = port_status.iter().last();
192 let err =
193 eyre::Report::new(e).wrap_err(format!("could not open port on {}", sensor_url));
194 return Err(if let Some(status) = last_status {
195 err.with_section(move || format!("{:?}", status).header("Last proxy event:"))
196 } else {
197 err
198 });
199 }
200 };
201
202 use crossbeam::select;
203 loop {
204 select! {
205 recv(new_client) -> tcp_client => {
206 if let Ok(stream) = tcp_client {
207 let addr = match stream.peer_addr() {
208 Ok(addr) => addr.to_string(),
209 Err(err) => {
210 log!(tf, "Failed to determine client address: {:?}", err);
211 continue;
212 }
213 };
214 let (rx_send, client_rx) = tio::port::Port::rx_channel_custom(proxy::Interface::get_client_tx_channel_size());
219 let client = match tio::port::Port::from_tcp_stream_custom(stream, tio::port::Port::rx_to_channel(rx_send), proxy::Interface::get_client_rx_channel_size()) {
220 Ok(client_port) => client_port,
221 _ => continue,
222 };
223
224 if verbose {
225 log!(tf, "Accepted client from {}", addr);
226 }
227 let port = proxy.new_port(Some(Duration::from_millis(2000)), subtree.clone(), usize::MAX, true, true).expect("Failed to create new proxy port");
228 let tf = tf.clone();
229 std::thread::spawn(move || {
230 let mut is_slow = false;
231 let mut dropped: usize = 0;
232 loop {
233 select! {
234 recv(port.receiver()) -> res => {
235 let pkt = if let Ok(pkt) = res { pkt } else {
236 log!(tf, "Disconnecting client {} due to internal error receiving tio data in thread", addr);
237 break;
238 };
239 if dump_traffic {
240 if match pkt.payload {
241 proto::Payload::RpcRequest(_) | proto::Payload::RpcReply(_) | proto::Payload::RpcError(_) => true,
242 _ => false,
243 } {
244 log!(tf, "{}->{} -- {:?}", pkt.routing, addr, pkt.payload);
245 }
246 }
247 match client.try_send(pkt) {
248 Err(tio::SendError::Full) => {
249 if disconnect_slow {
250 log!(tf, "Disconnecting client {} due to slowness", addr);
251 break;
252 } else if verbose {
253 if !is_slow {
254 is_slow = true;
255 log!(tf, "Client {} is not keeping up and is dropping packets", addr);
256 }
257 dropped += 1;
258 }
259 }
260 Ok(()) => {
261 if verbose && is_slow {
262 log!(tf, "Client {} resuming after having dropped {} packets", addr, dropped);
263 is_slow = false;
264 dropped = 0;
265 }
266 }
267 _ => {
268 if verbose {
269 log!(tf, "Client {} exiting", addr);
270 }
271 break;
272 }
273 }
274 }
275 recv(client_rx) -> res => {
276 match res {
277 Ok(Ok(pkt)) => {
278 if dump_traffic {
279 log!(tf, "{}->{} -- {:?}", addr, pkt.routing, pkt.payload);
280 }
281 if let Err(_) = port.try_send(pkt) {
282 log!(tf, "Disconnecting client {} due to internal error forwarding tio data in thread", addr);
283 break;
284 }
285 }
286 _ => {
287 if verbose {
288 log!(tf, "Client {} exiting", addr);
289 }
290 break;
291 }
292 }
293 }
294 }
295 }
296 });
297 } else {
298 bail!("listener thread died unexpectedly");
299 }
300 }
301 recv(port_status) -> status => {
302 if let Ok(evt) = status {
303 match evt {
304 proxy::Event::SensorDisconnected => {
305 log!(tf, "Sensor disconnected");
306 }
307 proxy::Event::SensorReconnected => {
308 log!(tf, "Sensor reconnected");
309 }
310 proxy::Event::FailedToReconnect => {
311 log!(tf, "Stopping reconnection attempts due to timeout");
312 }
313 proxy::Event::FailedToConnect => {
314 log!(tf, "Fatal proxy error: failed to connect to sensor");
315 }
316 proxy::Event::FatalError(err) => {
317 log!(tf, "Fatal proxy error: {:?}", err);
318 }
320 proxy::Event::ProtocolError(perr) => {
321 match perr {
322 proto::Error::Text(txt) => {
323 log!(tf, "Text: {}", txt);
324 }
325 other => {
326 if verbose || debugging {
327 log!(tf, "Protocol error: {:?}", other);
328 }
329 }
330 }
331 }
332 evt => {
333 if debugging {
334 log!(tf, "Proxy event: {:?}", evt)
335 }
336 }
337 }
338 } else {
339 break;
342 }
343 }
344 recv(proxy_port.receiver()) -> pkt_or_err => {
345 if let Ok(pkt) = pkt_or_err {
346 let dump = match pkt.payload {
347 proto::Payload::Heartbeat(_) => dump_hb,
348 proto::Payload::Metadata(_) => dump_meta,
349 proto::Payload::StreamData(_) => dump_data,
350 _ => dump_traffic
351 };
352 if dump {
353 log!(tf, "Packet from {} -- {:?}", pkt.routing, pkt.payload);
354 }
355 if let proto::Payload::LogMessage(log) = pkt.payload {
356 log!(tf, "{} {:?}: {}", pkt.routing, log.level, log.message);
357 }
358 }
359 }
360 }
361 }
362 Ok(())
363}