1use log::{debug, trace, warn};
2use std::collections::LinkedList;
3use std::io::{Read, Write};
4use std::net::TcpStream;
5use std::sync::mpsc;
6
7use crate::HEADER_LEN;
8use crate::{Call, Header, Packet, Pid, Port};
9use crate::{Error, Result};
10
11#[derive(Debug)]
15pub struct PortInfo {
16 pub count: usize,
18
19 pub ports: Vec<String>,
21}
22
23#[derive(Clone, Copy, Debug)]
27pub struct Baud(pub usize);
28
29#[derive(Debug)]
31pub struct PortCaps {
32 pub rate: Baud,
34
35 pub traffic_level: Option<u8>,
39
40 pub tx_tail: u8,
42 pub tx_delay: u8,
43 pub persist: u8,
44 pub slot_time: u8,
45 pub max_frame: u8,
46
47 pub active_connections: u8,
49
50 pub bytes_per_2min: u32,
53}
54
55enum Reply {
56 Version(u16, u16), CallsignRegistration(bool), PortInfo(PortInfo), PortCaps(PortCaps), FramesOutstandingPort(Port, usize), FramesOutstandingConnection(u32), HeardStations(String), Connected(String), ConnectedData(Vec<u8>), Disconnect, MonitorConnected(Vec<u8>), MonitorSupervisory(Vec<u8>), Unproto(Vec<u8>), ConnectedSent(Vec<u8>), Raw(Vec<u8>), Unknown(Header, Vec<u8>),
74}
75
76impl Reply {
77 fn description(&self) -> String {
78 match self {
79 Reply::Disconnect => "Disconnect".to_string(),
80 Reply::ConnectedData(data) => format!("ConnectedData: {data:?}"),
81 Reply::ConnectedSent(data) => format!("ConnectedSent: {data:?}"),
82 Reply::Unproto(data) => format!("Received unproto: {data:?}"),
83 Reply::PortInfo(s) => format!("Port info: {s:?}"),
84 Reply::PortCaps(s) => format!("Port caps: {s:?}"),
85 Reply::Connected(s) => format!("Connected: {s}"),
86 Reply::Version(maj, min) => format!("Version: {maj}.{min}"),
87 Reply::Raw(_data) => "Raw".to_string(),
88 Reply::CallsignRegistration(success) => format!("Callsign registration: {success}"),
89 Reply::FramesOutstandingPort(port, n) => {
90 format!("Frames outstanding port {port:?}: {n}")
91 }
92 Reply::FramesOutstandingConnection(n) => format!("Frames outstanding connection: {n}"),
93 Reply::MonitorConnected(x) => format!("Connected packet len {}", x.len()),
94 Reply::MonitorSupervisory(x) => format!("Supervisory packet len {}", x.len()),
95 Reply::HeardStations(s) => format!("Heard stations: {s}"),
96 Reply::Unknown(h, data) => format!("Unknown reply: header={h:?} data={data:?}"),
97 }
98 }
99}
100
101fn parse_reply(header: &Header, data: &[u8]) -> Result<Reply> {
102 Ok(match header.data_kind {
104 b'R' => {
105 let major = u16::from_le_bytes(
106 data[0..2]
107 .try_into()
108 .expect("can't happen: two bytes can't be made into u16?"),
109 );
110 let minor = u16::from_le_bytes(
111 data[4..6]
112 .try_into()
113 .expect("can't happen: two bytes can't be made into u16?"),
114 );
115 Reply::Version(major, minor)
116 }
117 b'X' => Reply::CallsignRegistration(data[0] == 1),
118 b'C' => Reply::Connected(std::str::from_utf8(data).map_err(Error::other)?.to_string()),
119 b'D' => Reply::ConnectedData(data.to_vec()),
120 b'd' => Reply::Disconnect,
121 b'T' => Reply::ConnectedSent(data.to_vec()),
122 b'U' => Reply::Unproto(data.to_vec()),
123 b'G' => {
124 let s = std::str::from_utf8(data).map_err(Error::other)?;
125 let (count, ports) = {
126 let mut np = s.splitn(2, ';');
127 let count = np
128 .next()
129 .expect("TODO: custom error")
130 .parse()
131 .map_err(Error::other)?;
132 let ports = np
133 .next()
134 .expect("TODO: custom error")
135 .split(';')
136 .map(std::string::ToString::to_string)
137 .filter(|s| s != "\0")
138 .collect();
139 (count, ports)
140 };
141 Reply::PortInfo(PortInfo { count, ports })
142 }
143 b'g' => {
144 let rate = data[0];
145 let traffic_level = data[1];
146 let tx_delay = data[2];
147 let tx_tail = data[3];
148 let persist = data[4];
149 let slot_time = data[5];
150 let max_frame = data[6];
151 let active_connections = data[7];
152 let bytes_per_2min =
153 u32::from_le_bytes(data[8..12].try_into().expect("can't happen: bytes to u32"));
154
155 let traffic_level = if traffic_level == 0xff {
156 None
157 } else {
158 Some(traffic_level)
159 };
160
161 Reply::PortCaps(PortCaps {
162 rate: Baud(rate.into()),
163 traffic_level,
164 tx_delay,
165 tx_tail,
166 slot_time,
167 max_frame,
168 active_connections,
169 bytes_per_2min,
170 persist,
171 })
172 }
173 b'y' => Reply::FramesOutstandingPort(
174 Port(1),
175 usize::try_from(u32::from_le_bytes(
176 data[0..4].try_into().expect("can't happen: bytes to u32"),
177 ))
178 .expect("TODO: some error"),
179 ),
180 b'Y' => Reply::FramesOutstandingConnection(u32::from_le_bytes(
181 data[0..4].try_into().expect("can't happen: bytes to u32"),
182 )),
183 b'H' => Reply::HeardStations(std::str::from_utf8(data).map_err(Error::other)?.to_string()),
184 b'I' => Reply::MonitorConnected(data.to_vec()),
185 b'S' => Reply::MonitorSupervisory(data.to_vec()),
186 b'K' => Reply::Raw(data.to_vec()),
187 _ => Reply::Unknown(header.clone(), data.to_vec()),
188 })
189}
190
191pub struct MakeWriter {
197 port: Port,
198 pid: Pid,
199 src: Call,
200 dst: Call,
201}
202impl MakeWriter {
203 pub fn data<T: Into<Vec<u8>>>(&self, data: T) -> Result<Vec<u8>> {
209 Ok(Packet::Data {
210 port: self.port,
211 pid: self.pid,
212 src: self.src.clone(),
213 dst: self.dst.clone(),
214 data: data.into(),
215 }
216 .serialize())
217 }
218 #[must_use]
220 pub fn disconnect(&self) -> Vec<u8> {
221 Packet::Disconnect {
222 port: self.port,
223 pid: self.pid,
224 src: self.src.clone(),
225 dst: self.dst.clone(),
226 }
227 .serialize()
228 }
229}
230
231pub struct Connection<'a> {
235 port: Port,
236 connect_string: String,
237 pid: Pid,
238 src: Call,
239 dst: Call,
240 agw: &'a mut AGW,
241 disconnected: bool,
242}
243
244impl<'a> Connection<'a> {
245 fn new(
246 agw: &'a mut AGW,
247 port: Port,
248 connect_string: String,
249 pid: Pid,
250 src: Call,
251 dst: Call,
252 ) -> Self {
253 Connection {
254 port,
255 connect_string,
256 pid,
257 src,
258 dst,
259 agw,
260 disconnected: false,
261 }
262 }
263
264 #[must_use]
266 pub fn connect_string(&self) -> &str {
267 &self.connect_string
268 }
269
270 pub fn read(&mut self) -> Result<Vec<u8>> {
276 self.agw.read_connected(&self.src, &self.dst)
277 }
278
279 pub fn write(&mut self, data: &[u8]) -> Result<usize> {
285 self.agw
286 .write_connected(self.port, self.pid, &self.src, &self.dst, data)
287 }
288
289 #[must_use]
292 pub fn make_writer(&self) -> MakeWriter {
293 MakeWriter {
294 port: self.port,
295 pid: self.pid,
296 src: self.src.clone(),
297 dst: self.dst.clone(),
298 }
299 }
300
301 pub fn sender(&mut self) -> mpsc::Sender<Vec<u8>> {
305 self.agw.sender()
306 }
307
308 pub fn disconnect(&mut self) -> Result<()> {
314 if !self.disconnected {
315 debug!("disconnecting");
316 self.agw.send(
317 &Packet::Disconnect {
318 port: self.port,
319 pid: self.pid,
320 src: self.src.clone(),
321 dst: self.dst.clone(),
322 }
323 .serialize(),
324 )?;
325 self.disconnected = true;
326 }
327 Ok(())
328 }
329}
330
331impl Drop for Connection<'_> {
332 fn drop(&mut self) {
333 if let Err(e) = self.disconnect() {
334 warn!("drop-disconnection errored with {e:?}");
335 }
336 }
337}
338
339#[allow(clippy::missing_panics_doc)]
345pub fn parse_header(header: &[u8; HEADER_LEN]) -> Result<Header> {
346 let src = Call::from_bytes(&header[8..18])?;
347 let src = if src.is_empty() { None } else { Some(src) };
348 let dst = Call::from_bytes(&header[18..28])?;
349 let dst = if dst.is_empty() { None } else { Some(dst) };
350 Ok(Header::new(
351 Port(header[0]),
352 header[4],
353 Pid(header[6]),
354 src,
355 dst,
356 u32::from_le_bytes(
357 header[28..32]
358 .try_into()
359 .expect("can't happen: bytes to u32"),
360 ),
361 ))
362}
363
364pub enum Command {
366 Version,
367}
368
369pub struct AGW {
371 rx: mpsc::Receiver<(Header, Reply)>,
372
373 tx: mpsc::Sender<Vec<u8>>,
375
376 rxqueue: LinkedList<(Header, Reply)>,
380}
381
382impl AGW {
383 pub fn new(addr: &str) -> Result<AGW> {
389 debug!("Creating AGW to {addr}");
390 let (tx, rx) = mpsc::channel();
391 let (tx2, rx2) = mpsc::channel();
392 let wstream = TcpStream::connect(addr).map_err(Error::other)?;
393 let rstream = wstream.try_clone().map_err(Error::other)?;
394 let agw = AGW {
395 rx,
396 tx: tx2,
397 rxqueue: LinkedList::new(),
398 };
399 std::thread::spawn(|| {
401 if let Err(e) = Self::reader(rstream, &tx) {
402 warn!("TCP socket reader connected to AGWPE ended: {e:?}");
403 }
404 drop(tx);
405 });
406 std::thread::spawn(|| {
408 if let Err(e) = Self::writer(wstream, &rx2) {
409 warn!("TCP socket writer connected to AGWPE ended: {e:?}");
410 }
411 drop(rx2);
412 });
413 Ok(agw)
414 }
415
416 fn send(&mut self, msg: &[u8]) -> Result<()> {
417 self.tx.send(msg.to_vec()).map_err(Error::other)?;
418 Ok(())
419 }
420
421 fn sender(&mut self) -> mpsc::Sender<Vec<u8>> {
422 self.tx.clone()
423 }
424
425 fn writer(mut stream: TcpStream, rx: &mpsc::Receiver<Vec<u8>>) -> Result<()> {
426 loop {
427 let buf = rx.recv().map_err(Error::other)?;
428 let _ = stream.write(&buf).map_err(Error::other)?;
430 }
431 }
432
433 fn reader(mut stream: TcpStream, tx: &mpsc::Sender<(Header, Reply)>) -> Result<()> {
434 loop {
435 let mut header = [0_u8; HEADER_LEN];
436 stream.read_exact(&mut header)?;
437 let header = parse_header(&header)?;
438 let payload = if header.data_len > 0 {
439 let mut payload = vec![0; header.data_len as usize];
440 stream.read_exact(&mut payload)?;
441 payload
442 } else {
443 Vec::new()
444 };
445 let reply = parse_reply(&header, &payload)?;
446 trace!("Got reply: {}", reply.description());
447 let done = matches!(reply, Reply::Disconnect);
448 tx.send((header, reply)).map_err(Error::other)?;
449 if done {
450 break Ok(());
451 }
452 }
453 }
454
455 fn rx_enqueue(&mut self, h: Header, r: Reply) {
456 const WARN_LIMIT: usize = 10;
457
458 self.rxqueue.push_back((h, r));
459 let l = self.rxqueue.len();
460 if l > WARN_LIMIT {
461 warn!("AGW maxqueue length {l} > {WARN_LIMIT}");
462 }
463 }
464
465 pub fn version(&mut self) -> Result<(u16, u16)> {
471 self.send(&Packet::VersionQuery.serialize())?;
472 loop {
473 let (h, r) = self.rx.recv().map_err(Error::other)?;
474 match r {
475 Reply::Version(maj, min) => return Ok((maj, min)),
476 other => self.rx_enqueue(h, other),
477 }
478 }
479 }
480
481 pub fn frames_outstanding(&mut self, port: Port) -> Result<usize> {
483 self.send(&Packet::FramesOutstandingPortQuery(port).serialize())?;
484 loop {
485 let (h, r) = self.rx.recv().map_err(Error::other)?;
486 match r {
487 Reply::FramesOutstandingPort(p, n) if p == port => return Ok(n),
488 other => self.rx_enqueue(h, other),
489 }
490 }
491 }
492
493 pub fn port_info(&mut self) -> Result<PortInfo> {
499 self.send(&Packet::PortInfoQuery.serialize())?;
500 loop {
501 let (h, r) = self.rx.recv().map_err(Error::other)?;
502 match r {
503 Reply::PortInfo(i) => return Ok(i),
504 other => self.rx_enqueue(h, other),
505 }
506 }
507 }
508
509 pub fn port_cap(&mut self, port: Port) -> Result<PortCaps> {
515 self.send(&Packet::PortCapQuery(port).serialize())
516 .map_err(Error::other)?;
517 loop {
518 let (h, r) = self.rx.recv().map_err(Error::other)?;
519 match r {
520 Reply::PortCaps(i) => return Ok(i),
521 other => self.rx_enqueue(h, other),
522 }
523 }
524 }
525
526 pub fn unproto(
532 &mut self,
533 port: Port,
534 pid: Pid,
535 src: &Call,
536 dst: &Call,
537 data: &[u8],
538 ) -> Result<()> {
539 self.send(
540 &Packet::Unproto {
541 port,
542 pid,
543 src: src.clone(),
544 dst: dst.clone(),
545 data: data.to_vec(),
546 }
547 .serialize(),
548 )?;
549 Ok(())
550 }
551
552 pub fn register_callsign(&mut self, port: Port, pid: Pid, src: &Call) -> Result<()> {
564 debug!("Registering callsign");
565 self.send(&Packet::RegisterCallsign(port, pid, src.clone()).serialize())?;
566 Ok(())
567 }
568
569 pub fn connect<'a>(
575 &'a mut self,
576 port: Port,
577 pid: Pid,
578 src: &Call,
579 dst: &Call,
580 via: &[Call],
581 ) -> Result<Connection<'a>> {
582 if via.is_empty() {
583 self.send(
584 &Packet::Connect {
585 port,
586 pid,
587 src: src.clone(),
588 dst: dst.clone(),
589 }
590 .serialize(),
591 )?;
592 } else {
593 self.send(
594 &Packet::ConnectVia {
595 port,
596 pid,
597 src: src.clone(),
598 dst: dst.clone(),
599 via: via.to_vec(),
600 }
601 .serialize(),
602 )?;
603 todo!();
604 }
605 let connect_string;
606 loop {
607 let (head, r) = self.rx.recv().map_err(Error::other)?;
608 if (head.src.as_ref() != Some(dst)) || (head.dst.as_ref() != Some(src)) {
609 continue;
611 }
612 match r {
613 Reply::Connected(i) => {
614 connect_string = i.clone();
615 debug!("Connected from {src} to {dst} with connect string {i}");
616 break;
617 }
618 other => self.rx_enqueue(head, other),
619 }
620 }
621 Ok(Connection::new(
622 self,
623 port,
624 connect_string,
625 pid,
626 src.clone(),
627 dst.clone(),
628 ))
629 }
630
631 fn write_connected(
632 &mut self,
633 port: Port,
634 pid: Pid,
635 src: &Call,
636 dst: &Call,
637 data: &[u8],
638 ) -> Result<usize> {
639 let len = data.len();
641 if len > 0 {
642 self.send(
643 &Packet::Data {
644 port,
645 pid,
646 src: src.clone(),
647 dst: dst.clone(),
648 data: data.to_vec(),
649 }
650 .serialize(),
651 )?;
652 }
653 Ok(data.len())
654 }
655
656 fn read_connected(&mut self, me: &Call, remote: &Call) -> Result<Vec<u8>> {
657 for frame in self.rxqueue.iter().enumerate() {
659 let (n, (head, payload)) = &frame;
660 if (head.src.as_ref() != Some(remote)) || (head.dst.as_ref() != Some(me)) {
661 continue;
662 }
663 match payload {
664 Reply::ConnectedData(data) => {
665 let ret = data.clone();
666 let mut tail = self.rxqueue.split_off(*n);
667 tail.pop_front();
668 self.rxqueue.append(&mut tail);
669 return Ok(ret);
670 }
671 Reply::Disconnect => {
672 return Err(Error::msg("remote end disconnected"));
673 }
674 _ => {
675 debug!("Remote end send unexpected data {}", payload.description());
676 }
677 }
678 }
679
680 loop {
682 let (h, r) = self.rx.recv().map_err(Error::other)?;
683 match r {
684 Reply::ConnectedData(i) => return Ok(i),
685 other => self.rx_enqueue(h, other),
686 }
687 }
688 }
689}