Skip to main content

agw/
v1.rs

1use log::{debug, trace, warn};
2use std::collections::LinkedList;
3use std::io::{Read, Write};
4use std::net::TcpStream;
5use std::sync::mpsc;
6
7use crate::HEADER_LEN;
8use crate::{Call, Header, Packet, Pid, Port};
9use crate::{Error, Result};
10
11// TODO: get rid of Reply struct. It's just a subset of Packet.
12
13/// Port information.
14#[derive(Debug)]
15pub struct PortInfo {
16    /// Number of ports.
17    pub count: usize,
18
19    /// Description of ports.
20    pub ports: Vec<String>,
21}
22
23/// Baud rate.
24///
25/// Normally 1200 or 9600 for classic AX.25.
26#[derive(Clone, Copy, Debug)]
27pub struct Baud(pub usize);
28
29/// Port capabilities.
30#[derive(Debug)]
31pub struct PortCaps {
32    /// On air baud rate.
33    pub rate: Baud,
34
35    /// Traffic level.
36    ///
37    /// `None` if port is not in autoupdate mode.
38    pub traffic_level: Option<u8>,
39
40    // TODO: get units on these.
41    pub tx_tail: u8,
42    pub tx_delay: u8,
43    pub persist: u8,
44    pub slot_time: u8,
45    pub max_frame: u8,
46
47    /// How many connections are active on this port
48    pub active_connections: u8,
49
50    /// How many bytes received in the last 2 minutes as a 32 bits (4 bytes)
51    /// integer. Updated every two minutes.
52    pub bytes_per_2min: u32,
53}
54
55enum Reply {
56    // TODO: should these actually pick up the header value subset,
57    // too, when appropriate?
58    Version(u16, u16),                  // R.
59    CallsignRegistration(bool),         // X.
60    PortInfo(PortInfo),                 // G.
61    PortCaps(PortCaps),                 // g.
62    FramesOutstandingPort(Port, usize), // y.
63    FramesOutstandingConnection(u32),   // Y.
64    HeardStations(String),              // H. TODO: parse
65    Connected(String),                  // C.
66    ConnectedData(Vec<u8>),             // D.
67    Disconnect,                         // d.
68    MonitorConnected(Vec<u8>),          // I.
69    MonitorSupervisory(Vec<u8>),        // S.
70    Unproto(Vec<u8>),                   // U.
71    ConnectedSent(Vec<u8>),             // T.
72    Raw(Vec<u8>),                       // R.
73    Unknown(Header, Vec<u8>),
74}
75
76impl Reply {
77    fn description(&self) -> String {
78        match self {
79            Reply::Disconnect => "Disconnect".to_string(),
80            Reply::ConnectedData(data) => format!("ConnectedData: {data:?}"),
81            Reply::ConnectedSent(data) => format!("ConnectedSent: {data:?}"),
82            Reply::Unproto(data) => format!("Received unproto: {data:?}"),
83            Reply::PortInfo(s) => format!("Port info: {s:?}"),
84            Reply::PortCaps(s) => format!("Port caps: {s:?}"),
85            Reply::Connected(s) => format!("Connected: {s}"),
86            Reply::Version(maj, min) => format!("Version: {maj}.{min}"),
87            Reply::Raw(_data) => "Raw".to_string(),
88            Reply::CallsignRegistration(success) => format!("Callsign registration: {success}"),
89            Reply::FramesOutstandingPort(port, n) => {
90                format!("Frames outstanding port {port:?}: {n}")
91            }
92            Reply::FramesOutstandingConnection(n) => format!("Frames outstanding connection: {n}"),
93            Reply::MonitorConnected(x) => format!("Connected packet len {}", x.len()),
94            Reply::MonitorSupervisory(x) => format!("Supervisory packet len {}", x.len()),
95            Reply::HeardStations(s) => format!("Heard stations: {s}"),
96            Reply::Unknown(h, data) => format!("Unknown reply: header={h:?} data={data:?}"),
97        }
98    }
99}
100
101fn parse_reply(header: &Header, data: &[u8]) -> Result<Reply> {
102    // TODO: confirm data len, since most replies will have fixed size.
103    Ok(match header.data_kind {
104        b'R' => {
105            let major = u16::from_le_bytes(
106                data[0..2]
107                    .try_into()
108                    .expect("can't happen: two bytes can't be made into u16?"),
109            );
110            let minor = u16::from_le_bytes(
111                data[4..6]
112                    .try_into()
113                    .expect("can't happen: two bytes can't be made into u16?"),
114            );
115            Reply::Version(major, minor)
116        }
117        b'X' => Reply::CallsignRegistration(data[0] == 1),
118        b'C' => Reply::Connected(std::str::from_utf8(data).map_err(Error::other)?.to_string()),
119        b'D' => Reply::ConnectedData(data.to_vec()),
120        b'd' => Reply::Disconnect,
121        b'T' => Reply::ConnectedSent(data.to_vec()),
122        b'U' => Reply::Unproto(data.to_vec()),
123        b'G' => {
124            let s = std::str::from_utf8(data).map_err(Error::other)?;
125            let (count, ports) = {
126                let mut np = s.splitn(2, ';');
127                let count = np
128                    .next()
129                    .expect("TODO: custom error")
130                    .parse()
131                    .map_err(Error::other)?;
132                let ports = np
133                    .next()
134                    .expect("TODO: custom error")
135                    .split(';')
136                    .map(std::string::ToString::to_string)
137                    .filter(|s| s != "\0")
138                    .collect();
139                (count, ports)
140            };
141            Reply::PortInfo(PortInfo { count, ports })
142        }
143        b'g' => {
144            let rate = data[0];
145            let traffic_level = data[1];
146            let tx_delay = data[2];
147            let tx_tail = data[3];
148            let persist = data[4];
149            let slot_time = data[5];
150            let max_frame = data[6];
151            let active_connections = data[7];
152            let bytes_per_2min =
153                u32::from_le_bytes(data[8..12].try_into().expect("can't happen: bytes to u32"));
154
155            let traffic_level = if traffic_level == 0xff {
156                None
157            } else {
158                Some(traffic_level)
159            };
160
161            Reply::PortCaps(PortCaps {
162                rate: Baud(rate.into()),
163                traffic_level,
164                tx_delay,
165                tx_tail,
166                slot_time,
167                max_frame,
168                active_connections,
169                bytes_per_2min,
170                persist,
171            })
172        }
173        b'y' => Reply::FramesOutstandingPort(
174            Port(1),
175            usize::try_from(u32::from_le_bytes(
176                data[0..4].try_into().expect("can't happen: bytes to u32"),
177            ))
178            .expect("TODO: some error"),
179        ),
180        b'Y' => Reply::FramesOutstandingConnection(u32::from_le_bytes(
181            data[0..4].try_into().expect("can't happen: bytes to u32"),
182        )),
183        b'H' => Reply::HeardStations(std::str::from_utf8(data).map_err(Error::other)?.to_string()),
184        b'I' => Reply::MonitorConnected(data.to_vec()),
185        b'S' => Reply::MonitorSupervisory(data.to_vec()),
186        b'K' => Reply::Raw(data.to_vec()),
187        _ => Reply::Unknown(header.clone(), data.to_vec()),
188    })
189}
190
191/// An object that has all the metadata needed to be able to create
192/// AGW "write some stuff on the established connection", without
193/// owning the whole connection object.
194///
195/// See examples/term.rs for example use.
196pub struct MakeWriter {
197    port: Port,
198    pid: Pid,
199    src: Call,
200    dst: Call,
201}
202impl MakeWriter {
203    /// Make the bytes of an AGW packet to send a packet of data.
204    ///
205    /// # Errors
206    ///
207    /// If given data so bad that the serialization fails.
208    pub fn data<T: Into<Vec<u8>>>(&self, data: T) -> Result<Vec<u8>> {
209        Ok(Packet::Data {
210            port: self.port,
211            pid: self.pid,
212            src: self.src.clone(),
213            dst: self.dst.clone(),
214            data: data.into(),
215        }
216        .serialize())
217    }
218    /// Make a disconnect packet.
219    #[must_use]
220    pub fn disconnect(&self) -> Vec<u8> {
221        Packet::Disconnect {
222            port: self.port,
223            pid: self.pid,
224            src: self.src.clone(),
225            dst: self.dst.clone(),
226        }
227        .serialize()
228    }
229}
230
231/// AX.25 connection object.
232///
233/// Created from an AGW object, using `.connect()`.
234pub struct Connection<'a> {
235    port: Port,
236    connect_string: String,
237    pid: Pid,
238    src: Call,
239    dst: Call,
240    agw: &'a mut AGW,
241    disconnected: bool,
242}
243
244impl<'a> Connection<'a> {
245    fn new(
246        agw: &'a mut AGW,
247        port: Port,
248        connect_string: String,
249        pid: Pid,
250        src: Call,
251        dst: Call,
252    ) -> Self {
253        Connection {
254            port,
255            connect_string,
256            pid,
257            src,
258            dst,
259            agw,
260            disconnected: false,
261        }
262    }
263
264    /// Return the connect string.
265    #[must_use]
266    pub fn connect_string(&self) -> &str {
267        &self.connect_string
268    }
269
270    /// Read user data from the connection.
271    ///
272    /// # Errors
273    ///
274    /// If the underlying connection fails.
275    pub fn read(&mut self) -> Result<Vec<u8>> {
276        self.agw.read_connected(&self.src, &self.dst)
277    }
278
279    /// Write data to the connection.
280    ///
281    /// # Errors
282    ///
283    /// If the underlying connection fails.
284    pub fn write(&mut self, data: &[u8]) -> Result<usize> {
285        self.agw
286            .write_connected(self.port, self.pid, &self.src, &self.dst, data)
287    }
288
289    /// Create MakeWriter object, in order to create AGW packets
290    /// without holding on to a connection.
291    #[must_use]
292    pub fn make_writer(&self) -> MakeWriter {
293        MakeWriter {
294            port: self.port,
295            pid: self.pid,
296            src: self.src.clone(),
297            dst: self.dst.clone(),
298        }
299    }
300
301    /// Return a copy of the mpsc to send bytes on the AGW connection.
302    ///
303    /// TODO: this should probably be abstracted away.
304    pub fn sender(&mut self) -> mpsc::Sender<Vec<u8>> {
305        self.agw.sender()
306    }
307
308    /// Disconnect the connection.
309    ///
310    /// # Errors
311    ///
312    /// If the underlying connection fails.
313    pub fn disconnect(&mut self) -> Result<()> {
314        if !self.disconnected {
315            debug!("disconnecting");
316            self.agw.send(
317                &Packet::Disconnect {
318                    port: self.port,
319                    pid: self.pid,
320                    src: self.src.clone(),
321                    dst: self.dst.clone(),
322                }
323                .serialize(),
324            )?;
325            self.disconnected = true;
326        }
327        Ok(())
328    }
329}
330
331impl Drop for Connection<'_> {
332    fn drop(&mut self) {
333        if let Err(e) = self.disconnect() {
334            warn!("drop-disconnection errored with {e:?}");
335        }
336    }
337}
338
339/// Parse header from bytes.
340///
341/// # Errors
342///
343/// If the header is invalid.
344#[allow(clippy::missing_panics_doc)]
345pub fn parse_header(header: &[u8; HEADER_LEN]) -> Result<Header> {
346    let src = Call::from_bytes(&header[8..18])?;
347    let src = if src.is_empty() { None } else { Some(src) };
348    let dst = Call::from_bytes(&header[18..28])?;
349    let dst = if dst.is_empty() { None } else { Some(dst) };
350    Ok(Header::new(
351        Port(header[0]),
352        header[4],
353        Pid(header[6]),
354        src,
355        dst,
356        u32::from_le_bytes(
357            header[28..32]
358                .try_into()
359                .expect("can't happen: bytes to u32"),
360        ),
361    ))
362}
363
364/// Command.
365pub enum Command {
366    Version,
367}
368
369/// AGW connection.
370pub struct AGW {
371    rx: mpsc::Receiver<(Header, Reply)>,
372
373    // Write entire frames.
374    tx: mpsc::Sender<Vec<u8>>,
375
376    // TODO: LinkedList is not awesome, because it's O(n) to remove an
377    // element in the middle.
378    // Maybe once Rust RFC2570 gets solved, it'll all be fine.
379    rxqueue: LinkedList<(Header, Reply)>,
380}
381
382impl AGW {
383    /// Create AGW connection to ip:port.
384    ///
385    /// # Errors
386    ///
387    /// If connecting to the server fails.
388    pub fn new(addr: &str) -> Result<AGW> {
389        debug!("Creating AGW to {addr}");
390        let (tx, rx) = mpsc::channel();
391        let (tx2, rx2) = mpsc::channel();
392        let wstream = TcpStream::connect(addr).map_err(Error::other)?;
393        let rstream = wstream.try_clone().map_err(Error::other)?;
394        let agw = AGW {
395            rx,
396            tx: tx2,
397            rxqueue: LinkedList::new(),
398        };
399        // Start reader.
400        std::thread::spawn(|| {
401            if let Err(e) = Self::reader(rstream, &tx) {
402                warn!("TCP socket reader connected to AGWPE ended: {e:?}");
403            }
404            drop(tx);
405        });
406        // Start writer.
407        std::thread::spawn(|| {
408            if let Err(e) = Self::writer(wstream, &rx2) {
409                warn!("TCP socket writer connected to AGWPE ended: {e:?}");
410            }
411            drop(rx2);
412        });
413        Ok(agw)
414    }
415
416    fn send(&mut self, msg: &[u8]) -> Result<()> {
417        self.tx.send(msg.to_vec()).map_err(Error::other)?;
418        Ok(())
419    }
420
421    fn sender(&mut self) -> mpsc::Sender<Vec<u8>> {
422        self.tx.clone()
423    }
424
425    fn writer(mut stream: TcpStream, rx: &mpsc::Receiver<Vec<u8>>) -> Result<()> {
426        loop {
427            let buf = rx.recv().map_err(Error::other)?;
428            // TODO: do full write.
429            let _ = stream.write(&buf).map_err(Error::other)?;
430        }
431    }
432
433    fn reader(mut stream: TcpStream, tx: &mpsc::Sender<(Header, Reply)>) -> Result<()> {
434        loop {
435            let mut header = [0_u8; HEADER_LEN];
436            stream.read_exact(&mut header)?;
437            let header = parse_header(&header)?;
438            let payload = if header.data_len > 0 {
439                let mut payload = vec![0; header.data_len as usize];
440                stream.read_exact(&mut payload)?;
441                payload
442            } else {
443                Vec::new()
444            };
445            let reply = parse_reply(&header, &payload)?;
446            trace!("Got reply: {}", reply.description());
447            let done = matches!(reply, Reply::Disconnect);
448            tx.send((header, reply)).map_err(Error::other)?;
449            if done {
450                break Ok(());
451            }
452        }
453    }
454
455    fn rx_enqueue(&mut self, h: Header, r: Reply) {
456        const WARN_LIMIT: usize = 10;
457
458        self.rxqueue.push_back((h, r));
459        let l = self.rxqueue.len();
460        if l > WARN_LIMIT {
461            warn!("AGW maxqueue length {l} > {WARN_LIMIT}");
462        }
463    }
464
465    /// Get the version of the AGW endpoint.
466    ///
467    /// # Errors
468    ///
469    /// If the underlying connection fails.
470    pub fn version(&mut self) -> Result<(u16, u16)> {
471        self.send(&Packet::VersionQuery.serialize())?;
472        loop {
473            let (h, r) = self.rx.recv().map_err(Error::other)?;
474            match r {
475                Reply::Version(maj, min) => return Ok((maj, min)),
476                other => self.rx_enqueue(h, other),
477            }
478        }
479    }
480
481    /// Get the number of outstanding frames on a port.
482    pub fn frames_outstanding(&mut self, port: Port) -> Result<usize> {
483        self.send(&Packet::FramesOutstandingPortQuery(port).serialize())?;
484        loop {
485            let (h, r) = self.rx.recv().map_err(Error::other)?;
486            match r {
487                Reply::FramesOutstandingPort(p, n) if p == port => return Ok(n),
488                other => self.rx_enqueue(h, other),
489            }
490        }
491    }
492
493    /// Get some port info for the AGW endpoint.
494    ///
495    /// # Errors
496    ///
497    /// If the underlying connection fails.
498    pub fn port_info(&mut self) -> Result<PortInfo> {
499        self.send(&Packet::PortInfoQuery.serialize())?;
500        loop {
501            let (h, r) = self.rx.recv().map_err(Error::other)?;
502            match r {
503                Reply::PortInfo(i) => return Ok(i),
504                other => self.rx_enqueue(h, other),
505            }
506        }
507    }
508
509    /// Get port capabilities of the AGW "port".
510    ///
511    /// # Errors
512    ///
513    /// If the underlying connection fails.
514    pub fn port_cap(&mut self, port: Port) -> Result<PortCaps> {
515        self.send(&Packet::PortCapQuery(port).serialize())
516            .map_err(Error::other)?;
517        loop {
518            let (h, r) = self.rx.recv().map_err(Error::other)?;
519            match r {
520                Reply::PortCaps(i) => return Ok(i),
521                other => self.rx_enqueue(h, other),
522            }
523        }
524    }
525
526    /// Send UI packet.
527    ///
528    /// # Errors
529    ///
530    /// If the underlying connection fails.
531    pub fn unproto(
532        &mut self,
533        port: Port,
534        pid: Pid,
535        src: &Call,
536        dst: &Call,
537        data: &[u8],
538    ) -> Result<()> {
539        self.send(
540            &Packet::Unproto {
541                port,
542                pid,
543                src: src.clone(),
544                dst: dst.clone(),
545                data: data.to_vec(),
546            }
547            .serialize(),
548        )?;
549        Ok(())
550    }
551
552    /// Register callsign.
553    ///
554    /// The specs say that registering the callsign is
555    /// mandatory. Direwolf doesn't seem to care, but there it is.
556    ///
557    /// Presumably needed for incoming connection, but incoming
558    /// connections are not tested yet.
559    ///
560    /// # Errors
561    ///
562    /// If underlying connection fails.
563    pub fn register_callsign(&mut self, port: Port, pid: Pid, src: &Call) -> Result<()> {
564        debug!("Registering callsign");
565        self.send(&Packet::RegisterCallsign(port, pid, src.clone()).serialize())?;
566        Ok(())
567    }
568
569    /// Create a new connection.
570    ///
571    /// # Errors
572    ///
573    /// If the underlying connection fails.
574    pub fn connect<'a>(
575        &'a mut self,
576        port: Port,
577        pid: Pid,
578        src: &Call,
579        dst: &Call,
580        via: &[Call],
581    ) -> Result<Connection<'a>> {
582        if via.is_empty() {
583            self.send(
584                &Packet::Connect {
585                    port,
586                    pid,
587                    src: src.clone(),
588                    dst: dst.clone(),
589                }
590                .serialize(),
591            )?;
592        } else {
593            self.send(
594                &Packet::ConnectVia {
595                    port,
596                    pid,
597                    src: src.clone(),
598                    dst: dst.clone(),
599                    via: via.to_vec(),
600                }
601                .serialize(),
602            )?;
603            todo!();
604        }
605        let connect_string;
606        loop {
607            let (head, r) = self.rx.recv().map_err(Error::other)?;
608            if (head.src.as_ref() != Some(dst)) || (head.dst.as_ref() != Some(src)) {
609                //eprintln!("Got packet not for us");
610                continue;
611            }
612            match r {
613                Reply::Connected(i) => {
614                    connect_string = i.clone();
615                    debug!("Connected from {src} to {dst} with connect string {i}");
616                    break;
617                }
618                other => self.rx_enqueue(head, other),
619            }
620        }
621        Ok(Connection::new(
622            self,
623            port,
624            connect_string,
625            pid,
626            src.clone(),
627            dst.clone(),
628        ))
629    }
630
631    fn write_connected(
632        &mut self,
633        port: Port,
634        pid: Pid,
635        src: &Call,
636        dst: &Call,
637        data: &[u8],
638    ) -> Result<usize> {
639        // TODO: enforce max size?
640        let len = data.len();
641        if len > 0 {
642            self.send(
643                &Packet::Data {
644                    port,
645                    pid,
646                    src: src.clone(),
647                    dst: dst.clone(),
648                    data: data.to_vec(),
649                }
650                .serialize(),
651            )?;
652        }
653        Ok(data.len())
654    }
655
656    fn read_connected(&mut self, me: &Call, remote: &Call) -> Result<Vec<u8>> {
657        // First check the existing queue.
658        for frame in self.rxqueue.iter().enumerate() {
659            let (n, (head, payload)) = &frame;
660            if (head.src.as_ref() != Some(remote)) || (head.dst.as_ref() != Some(me)) {
661                continue;
662            }
663            match payload {
664                Reply::ConnectedData(data) => {
665                    let ret = data.clone();
666                    let mut tail = self.rxqueue.split_off(*n);
667                    tail.pop_front();
668                    self.rxqueue.append(&mut tail);
669                    return Ok(ret);
670                }
671                Reply::Disconnect => {
672                    return Err(Error::msg("remote end disconnected"));
673                }
674                _ => {
675                    debug!("Remote end send unexpected data {}", payload.description());
676                }
677            }
678        }
679
680        // Next packet not in the queue. Wait.
681        loop {
682            let (h, r) = self.rx.recv().map_err(Error::other)?;
683            match r {
684                Reply::ConnectedData(i) => return Ok(i),
685                other => self.rx_enqueue(h, other),
686            }
687        }
688    }
689}