1use crate::{MountArg, ProxyCli, ProxySubcommands};
8use std::io;
9use std::net::TcpListener;
10use std::time::Duration;
11use twinleaf::device::discovery::{self, PortInterface};
12use twinleaf::tio::{self, proto, proxy};
13
14fn init_proxy_logging(verbose: bool, debug: bool) {
15 use std::io::Write;
16 let level_filter = if debug {
17 "trace"
18 } else if verbose {
19 "debug"
20 } else {
21 "info,device=debug"
22 };
23 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(level_filter))
24 .format(|buf, record| {
25 let level = record.level();
26 let level_style = buf.default_level_style(level);
27 let target = record.target();
28 let source = target
29 .strip_prefix("device::")
30 .unwrap_or_else(|| target.rsplit("::").next().unwrap_or(target));
31 let bold = env_logger::fmt::style::Style::new().bold();
32 let ts = chrono::Local::now().format("%T%.3f");
33 writeln!(
34 buf,
35 "{ts} {level_style}{level:5}{level_style:#} {bold}{source}:{bold:#} {}",
36 record.args()
37 )
38 })
39 .init();
40}
41
42pub fn run_proxy(mut proxy_cli: ProxyCli) -> eyre::Result<()> {
43 match proxy_cli.subcommands.take() {
44 Some(ProxySubcommands::Nmea { tio, tcp_port }) => {
45 init_proxy_logging(false, false);
46 crate::tools::proxy_nmea::run_nmea_proxy(tio, tcp_port)
47 }
48 None => {
49 init_proxy_logging(proxy_cli.verbose, proxy_cli.debug);
50 if proxy_cli.timestamp_format != "%T%.3f " {
51 log::warn!(
52 "--timestamp is deprecated and no longer applied; \
53 timestamps are emitted by the logger"
54 );
55 }
56 if proxy_cli.enumerate {
57 return crate::tools::list::list_devices_deprecated(true);
58 }
59 if proxy_cli.auto {
60 log::warn!(
61 "'--auto' is deprecated; running without a URL now auto-detects by default"
62 );
63 }
64 let mounts = std::mem::take(&mut proxy_cli.mounts);
65 let layout = Layout::from_cli(mounts, proxy_cli.sensor_url.take())?;
66 let server = ProxyServer {
67 config: ProxyConfig::from(&proxy_cli),
68 layout,
69 };
70 server.run()
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
77struct ProxyConfig {
78 tcp_port: u16,
79 reconnect_timeout: Duration,
80 disconnect_slow: bool,
81 verbose: bool,
82 debugging: bool,
83 subtree: proto::DeviceRoute,
84 dump_traffic: bool,
85 dump_data: bool,
86 dump_meta: bool,
87 dump_hb: bool,
88}
89
90impl From<&ProxyCli> for ProxyConfig {
91 fn from(cli: &ProxyCli) -> Self {
92 Self {
93 tcp_port: cli.port,
94 reconnect_timeout: Duration::from_secs(cli.reconnect_timeout),
95 disconnect_slow: cli.kick_slow,
96 verbose: cli.verbose,
97 debugging: cli.debug,
98 subtree: cli.subtree.clone(),
99 dump_traffic: cli.dump,
100 dump_data: cli.dump_data,
101 dump_meta: cli.dump_meta,
102 dump_hb: cli.dump_hb,
103 }
104 }
105}
106
107#[derive(Debug, Clone)]
110struct Mount {
111 locator: String,
112 prefix: proto::DeviceRoute,
113 auto_detected: bool,
114}
115
116#[derive(Debug, Clone)]
117struct Layout {
118 mounts: Vec<Mount>,
119}
120
121impl Layout {
122 fn from_cli(mount_args: Vec<MountArg>, sensor_url: Option<String>) -> eyre::Result<Layout> {
123 if mount_args.is_empty() {
124 return Ok(Layout {
125 mounts: vec![resolve_root_mount(sensor_url)?],
126 });
127 }
128 let mut prefixes = std::collections::HashSet::new();
129 for arg in &mount_args {
130 if !prefixes.insert(arg.prefix.clone()) {
131 return Err(eyre::eyre!("duplicate mount prefix {}", arg.prefix));
132 }
133 }
134 Ok(Layout {
135 mounts: mount_args
136 .into_iter()
137 .map(|arg| Mount {
138 locator: arg.locator,
139 prefix: arg.prefix,
140 auto_detected: false,
141 })
142 .collect(),
143 })
144 }
145}
146
147fn resolve_root_mount(sensor_url: Option<String>) -> eyre::Result<Mount> {
150 use color_eyre::Help;
151
152 let auto_detected = sensor_url.is_none();
153 let locator = if let Some(url) = sensor_url {
154 url
155 } else {
156 let devices = discovery::enumerate_serial(false);
157 let mut valid_urls = Vec::new();
158 for dev in devices {
159 match dev.interface {
160 PortInterface::STM32 | PortInterface::FTDI => {
161 valid_urls.push(dev.url.clone());
162 }
163 _ => {}
164 }
165 }
166 if valid_urls.is_empty() {
167 return Err(eyre::eyre!("no sensors detected")
168 .suggestion("specify a URL with -s <url>, or run 'tio list'"));
169 }
170 if valid_urls.len() > 1 {
171 eprintln!("multiple sensors detected:");
172 let query_timeout = Duration::from_millis(500);
173 for url in &valid_urls {
174 match discovery::query_name(url, query_timeout) {
175 Some(name) => eprintln!(" {} {}", url, name),
176 None => eprintln!(" {} (no response)", url),
177 }
178 }
179 return Err(eyre::eyre!("multiple sensors detected, cannot auto-select")
180 .suggestion("specify one with -s <url>")
181 .suggestion("or mount each at a route prefix with --mount <url>=/N"));
182 }
183 valid_urls.swap_remove(0)
184 };
185
186 Ok(Mount {
187 locator,
188 prefix: proto::DeviceRoute::root(),
189 auto_detected,
190 })
191}
192
193struct DeviceLink {
196 prefix: proto::DeviceRoute,
197 interface: proxy::Interface,
198 status_rx: crossbeam::channel::Receiver<proxy::Event>,
199 monitor_port: proxy::Port,
200}
201
202#[derive(Clone, Copy)]
205enum Source<'a> {
206 NewClient,
207 Status(&'a DeviceLink),
208 DevicePacket(&'a DeviceLink),
209}
210
211enum Disconnect {
213 ClientClosed,
214 TooSlow,
215 PortReceiveFailed,
216 PortForwardFailed,
217}
218
219fn is_rpc(payload: &proto::Payload) -> bool {
220 matches!(
221 payload,
222 proto::Payload::RpcRequest(_) | proto::Payload::RpcReply(_) | proto::Payload::RpcError(_)
223 )
224}
225
226#[derive(Default)]
229struct SlowTracker {
230 is_slow: bool,
231 dropped: usize,
232}
233
234impl SlowTracker {
235 fn packet_dropped(&mut self, addr: &str) {
236 if !log::log_enabled!(log::Level::Debug) {
237 return;
238 }
239 if !self.is_slow {
240 self.is_slow = true;
241 log::debug!("Client {} is not keeping up and is dropping packets", addr);
242 }
243 self.dropped += 1;
244 }
245
246 fn packet_delivered(&mut self, addr: &str) {
247 if self.is_slow {
248 log::debug!(
249 "Client {} resuming after having dropped {} packets",
250 addr,
251 self.dropped
252 );
253 self.is_slow = false;
254 self.dropped = 0;
255 }
256 }
257}
258
259struct ProxyServer {
262 config: ProxyConfig,
263 layout: Layout,
264}
265
266impl ProxyServer {
267 fn run(self) -> eyre::Result<()> {
268 use color_eyre::{Help, SectionExt};
269 use eyre::bail;
270
271 self.print_startup();
272
273 let new_client = self.start_listeners()?;
274
275 let mut links = Vec::with_capacity(self.layout.mounts.len());
276 for mount in &self.layout.mounts {
277 let (status_send, status_rx) = crossbeam::channel::bounded::<proxy::Event>(100);
278 let interface = proxy::Interface::new_proxy(
279 &mount.locator,
280 Some(self.config.reconnect_timeout),
281 Some(status_send),
282 );
283 let monitor_port = match interface.subtree_full(self.config.subtree.clone()) {
286 Ok(port) => port,
287 Err(e) => {
288 let last_status = status_rx.iter().last();
289 let err = eyre::Report::new(e)
290 .wrap_err(format!("could not open port on {}", mount.locator));
291 return Err(if let Some(status) = last_status {
292 err.with_section(move || {
293 format!("{:?}", status).header("Last proxy event:")
294 })
295 } else {
296 err
297 });
298 }
299 };
300 links.push(DeviceLink {
301 prefix: mount.prefix.clone(),
302 interface,
303 status_rx,
304 monitor_port,
305 });
306 }
307
308 let mut sel = crossbeam::channel::Select::new();
309 let mut sources = Vec::with_capacity(1 + 2 * links.len());
310 sel.recv(&new_client);
311 sources.push(Source::NewClient);
312 for link in &links {
313 sel.recv(&link.status_rx);
314 sources.push(Source::Status(link));
315 sel.recv(link.monitor_port.receiver());
316 sources.push(Source::DevicePacket(link));
317 }
318
319 loop {
320 let oper = sel.select();
321 match sources[oper.index()] {
322 Source::NewClient => {
323 let Ok(stream) = oper.recv(&new_client) else {
324 bail!("listener thread died unexpectedly");
325 };
326 self.accept_client(stream, &links);
327 }
328 Source::Status(link) => {
329 let Ok(evt) = oper.recv(&link.status_rx) else {
330 break;
333 };
334 log_proxy_event(evt, &link.prefix);
335 }
336 Source::DevicePacket(link) => {
337 let Ok(pkt) = oper.recv(link.monitor_port.receiver()) else {
338 break;
339 };
340 self.log_device_packet(pkt, &link.prefix);
341 }
342 }
343 }
344 Ok(())
345 }
346
347 fn print_startup(&self) {
348 println!("tio proxy starting:");
349 let mounts = &self.layout.mounts;
350 if mounts.len() == 1 && mounts[0].prefix.len() == 0 {
351 println!(
352 " Sensor: {}{}",
353 mounts[0].locator,
354 if mounts[0].auto_detected {
355 " (auto-detected)"
356 } else {
357 ""
358 }
359 );
360 } else {
361 println!(" Mounts:");
362 for mount in mounts {
363 println!(" {} {}", mount.prefix, mount.locator);
364 }
365 }
366 println!(" TCP port: {}", self.config.tcp_port);
367 println!(" Subtree: {}", self.config.subtree);
368
369 let flags = [
370 ("verbose", self.config.verbose),
371 ("debug", self.config.debugging),
372 ("kick-slow", self.config.disconnect_slow),
373 ("dump", self.config.dump_traffic),
374 ("dump-data", self.config.dump_data),
375 ("dump-meta", self.config.dump_meta),
376 ("dump-hb", self.config.dump_hb),
377 ];
378 let enabled: Vec<&str> = flags
379 .iter()
380 .filter_map(|&(name, on)| on.then_some(name))
381 .collect();
382 if !enabled.is_empty() {
383 println!(" Flags: {}", enabled.join(" "));
384 }
385 println!();
386 }
387
388 fn start_listeners(&self) -> eyre::Result<crossbeam::channel::Receiver<std::net::TcpStream>> {
389 use color_eyre::Help;
390
391 let (client_send, new_client) = crossbeam::channel::bounded::<std::net::TcpStream>(10);
392 let started_v6 = create_listener_thread(
393 std::net::SocketAddr::new(
394 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
395 self.config.tcp_port,
396 ),
397 client_send.clone(),
398 );
399 let started_v4 = if let (Ok(()), false) = (&started_v6, cfg!(windows)) {
400 Ok(())
404 } else {
405 create_listener_thread(
406 std::net::SocketAddr::new(
407 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
408 self.config.tcp_port,
409 ),
410 client_send.clone(),
411 )
412 };
413 if let (Err(e1), Err(e2)) = (started_v6, started_v4) {
414 let addr_in_use = matches!(e1.kind(), io::ErrorKind::AddrInUse)
415 || matches!(e2.kind(), io::ErrorKind::AddrInUse);
416 let err = eyre::eyre!(
417 "could not bind TCP port {}: v6={}, v4={}",
418 self.config.tcp_port,
419 e1,
420 e2
421 );
422 return Err(if addr_in_use {
423 err.suggestion(format!(
424 "another 'tio proxy' is likely running on port {}; try --port <N>",
425 self.config.tcp_port
426 ))
427 } else {
428 err
429 });
430 }
431 Ok(new_client)
432 }
433
434 fn accept_client(&self, stream: std::net::TcpStream, links: &[DeviceLink]) {
435 let addr = match stream.peer_addr() {
436 Ok(addr) => addr.to_string(),
437 Err(err) => {
438 log::warn!("Failed to determine client address: {:?}", err);
439 return;
440 }
441 };
442 let (rx_send, client_rx) =
447 tio::port::Port::rx_channel_custom(proxy::Interface::get_client_tx_channel_size());
448 let client = match tio::port::Port::from_tcp_stream_custom(
449 stream,
450 tio::port::Port::rx_to_channel(rx_send),
451 proxy::Interface::get_client_rx_channel_size(),
452 ) {
453 Ok(client_port) => client_port,
454 _ => return,
455 };
456
457 log::debug!("Accepted client from {}", addr);
458 let mut ports = Vec::with_capacity(links.len());
459 for link in links {
460 let port = link
461 .interface
462 .new_port(
463 Some(Duration::from_millis(2000)),
464 self.config.subtree.clone(),
465 usize::MAX,
466 true,
467 true,
468 )
469 .expect("Failed to create new proxy port");
470 ports.push((link.prefix.clone(), port));
471 }
472
473 let dump_traffic = self.config.dump_traffic;
474 let disconnect_slow = self.config.disconnect_slow;
475 std::thread::spawn(move || {
476 let mut slow = SlowTracker::default();
477
478 let mut sel = crossbeam::channel::Select::new();
480 sel.recv(&client_rx);
481 for (_, port) in &ports {
482 sel.recv(port.receiver());
483 }
484
485 let reason = loop {
486 let oper = sel.select();
487 match oper.index() {
488 0 => {
489 let Ok(Ok(mut pkt)) = oper.recv(&client_rx) else {
490 break Disconnect::ClientClosed;
491 };
492 if dump_traffic {
493 log::info!("{}->{} -- {:?}", addr, pkt.routing, pkt.payload);
494 }
495 let mut dest = None;
496 for (prefix, port) in &ports {
497 if let Ok(relative) = prefix.relative_route(&pkt.routing) {
498 dest = Some((relative, port));
499 break;
500 }
501 }
502 let Some((relative, port)) = dest else {
503 log::debug!(
504 "Client {} addressed unmounted route {}",
505 addr,
506 pkt.routing
507 );
508 continue;
509 };
510 pkt.routing = relative;
511 if port.try_send(pkt).is_err() {
512 break Disconnect::PortForwardFailed;
513 }
514 }
515 i => {
516 let (prefix, port) = &ports[i - 1];
517 let Ok(mut pkt) = oper.recv(port.receiver()) else {
518 break Disconnect::PortReceiveFailed;
519 };
520 pkt.routing = prefix.absolute_route(&pkt.routing);
521 if pkt.routing.len() > proto::TIO_PACKET_MAX_ROUTING_SIZE {
522 log::warn!(
523 "Dropping packet for client {}: route {} exceeds max depth",
524 addr,
525 pkt.routing
526 );
527 continue;
528 }
529 if dump_traffic && is_rpc(&pkt.payload) {
530 log::info!("{}->{} -- {:?}", pkt.routing, addr, pkt.payload);
531 }
532 match client.try_send(pkt) {
533 Ok(()) => slow.packet_delivered(&addr),
534 Err(tio::SendError::Full) if !disconnect_slow => {
535 slow.packet_dropped(&addr)
536 }
537 Err(tio::SendError::Full) => break Disconnect::TooSlow,
538 Err(_) => break Disconnect::ClientClosed,
539 }
540 }
541 }
542 };
543
544 match reason {
545 Disconnect::ClientClosed => log::debug!("Client {} exiting", addr),
546 Disconnect::TooSlow => {
547 log::warn!("Disconnecting client {} due to slowness", addr)
548 }
549 Disconnect::PortReceiveFailed => log::warn!(
550 "Disconnecting client {} due to internal error receiving tio data in thread",
551 addr
552 ),
553 Disconnect::PortForwardFailed => log::warn!(
554 "Disconnecting client {} due to internal error forwarding tio data in thread",
555 addr
556 ),
557 }
558 });
559 }
560
561 fn log_device_packet(&self, mut pkt: proto::Packet, prefix: &proto::DeviceRoute) {
562 pkt.routing = prefix.absolute_route(&pkt.routing);
563 let dump = match pkt.payload {
564 proto::Payload::Heartbeat(_) => self.config.dump_hb,
565 proto::Payload::Metadata(_) => self.config.dump_meta,
566 proto::Payload::StreamData(_) => self.config.dump_data,
567 _ => self.config.dump_traffic,
568 };
569 if dump {
570 log::info!("Packet from {} -- {:?}", pkt.routing, pkt.payload);
571 }
572 if let proto::Payload::LogMessage(log_msg) = pkt.payload {
573 let level = match &log_msg.level {
576 proto::LogLevel::Critical | proto::LogLevel::Error => log::Level::Error,
577 proto::LogLevel::Warning => log::Level::Warn,
578 proto::LogLevel::Info => log::Level::Info,
579 proto::LogLevel::Debug => log::Level::Debug,
580 proto::LogLevel::Unknown(_) => log::Level::Info,
581 };
582 log::log!(target: &format!("device::{}", pkt.routing), level, "{}", log_msg.message);
583 }
584 }
585}
586
587fn create_listener_thread(
588 addr: std::net::SocketAddr,
589 client_send: crossbeam::channel::Sender<std::net::TcpStream>,
590) -> io::Result<()> {
591 let listener = TcpListener::bind(addr)?;
592 std::thread::Builder::new()
593 .name("listener".to_string())
594 .spawn(move || {
595 for res in listener.incoming() {
596 match res {
597 Ok(stream) => client_send.send(stream).expect("New client queue full"),
598 Err(err) => eprintln!("error accepting client: {}", err),
599 };
600 }
601 })?;
602 Ok(())
603}
604
605fn log_proxy_event(evt: proxy::Event, prefix: &proto::DeviceRoute) {
606 let target = format!("proxy::{}", prefix);
607 let target = target.as_str();
608 match evt {
609 proxy::Event::SensorDisconnected => {
610 log::warn!(target: target, "Sensor disconnected");
611 }
612 proxy::Event::SensorReconnected => {
613 log::info!(target: target, "Sensor reconnected");
614 }
615 proxy::Event::FailedToReconnect => {
616 log::error!(target: target, "Stopping reconnection attempts due to timeout");
617 }
618 proxy::Event::FailedToConnect => {
619 log::error!(target: target, "Fatal proxy error: failed to connect to sensor");
620 }
621 proxy::Event::FatalError(err) => {
622 log::error!(target: target, "Fatal proxy error: {:?}", err);
623 }
625 proxy::Event::ProtocolError(perr) => match perr {
626 proto::Error::Text(txt) => {
627 log::info!(target: target, "Text: {}", txt);
628 }
629 other => {
630 log::debug!(target: target, "Protocol error: {:?}", other);
631 }
632 },
633 evt => {
634 log::trace!(target: target, "{:?}", evt);
635 }
636 }
637}