simple_aprs/
lib.rs

1mod abort;
2mod codec;
3mod error;
4
5use aprs_parser::{AprsPacket, DecodeError};
6use async_stream::try_stream;
7use futures::sink::SinkExt;
8use futures::{Stream, StreamExt};
9use log::{info, trace};
10use std::io;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::net::{
14    tcp::{OwnedReadHalf, OwnedWriteHalf},
15    TcpStream,
16};
17use tokio::sync::Mutex;
18use tokio::task::JoinHandle;
19use tokio::time;
20use tokio_util::codec::{FramedRead, FramedWrite};
21
22// how many seconds to wait for a line from the APRS-IS server
23// used to detect a connection that has hanged
24const HEARTBEAT_TIME: u64 = 60;
25
26type Reader = FramedRead<OwnedReadHalf, codec::ByteLinesCodec>;
27type Writer = FramedWrite<OwnedWriteHalf, codec::ByteLinesCodec>;
28
29pub struct RawPacket {
30    pub raw: Vec<u8>,
31}
32
33impl RawPacket {
34    pub fn parsed(&self) -> Result<AprsPacket, DecodeError> {
35        AprsPacket::decode_textual(&self.raw)
36    }
37}
38
39pub struct ISSettings {
40    pub host: String,
41    pub port: u16,
42    pub callsign: String,
43    pub passcode: String,
44    pub filter: String,
45}
46
47impl ISSettings {
48    pub fn new(
49        host: String,
50        port: u16,
51        callsign: String,
52        passcode: String,
53        filter: String,
54    ) -> ISSettings {
55        ISSettings {
56            host,
57            port,
58            callsign,
59            passcode,
60            filter,
61        }
62    }
63}
64
65pub struct ISReader {
66    reader: Reader,
67
68    // Used to kill the heartbeat task
69    // Once reader + writer are out of scope
70    #[allow(dead_code)]
71    handler: Arc<Mutex<abort::AbortOnDrop<()>>>,
72}
73
74impl ISReader {
75    pub fn stream(&mut self) -> impl Stream<Item = Result<RawPacket, error::ISReadError>> + '_ {
76        try_stream! {
77            while let Some(packet) = tokio::time::timeout(Duration::from_secs(HEARTBEAT_TIME), self.reader.next()).await? {
78                let packet = packet?;
79                if packet[0] == b'#' {
80                    let server_message = String::from_utf8(packet.to_vec())?;
81                    trace!("Received server response: {}", server_message);
82                    if server_message.contains("unverified") {
83                        info!("User not verified on APRS-IS server");
84                        continue;
85                    }
86                    if server_message.contains(" verified") {
87                        info!("User verified on APRS-IS server");
88                    }
89                } else {
90                    trace!("{:?}", packet);
91                    yield RawPacket {
92                        raw: packet.to_vec(),
93                    };
94                }
95            }
96        }
97    }
98}
99
100#[derive(Clone)]
101pub struct ISWriter {
102    writer: Arc<Mutex<Writer>>,
103
104    // Used to kill the heartbeat task
105    // Once reader + writer are out of scope
106    #[allow(dead_code)]
107    handler: Arc<Mutex<abort::AbortOnDrop<()>>>,
108}
109
110impl ISWriter {
111    pub async fn send(&mut self, packet: &AprsPacket) -> Result<(), error::ISSendError> {
112        let mut buf = vec![];
113        packet.encode_textual(&mut buf)?;
114
115        self.writer.lock().await.send(&buf).await?;
116
117        Ok(())
118    }
119}
120
121pub struct ISConnection {
122    reader: ISReader,
123    writer: ISWriter,
124}
125
126impl ISConnection {
127    pub async fn connect(settings: &ISSettings) -> Result<Self, io::Error> {
128        let (reader, mut writer) = Self::init_connect(settings).await?;
129        Self::login(settings, &mut writer).await?;
130
131        let writer = Arc::new(Mutex::new(writer));
132        let handler = Arc::new(Mutex::new(abort::AbortOnDrop::new(
133            Self::heartbeat(writer.clone()).await,
134        )));
135
136        let reader = ISReader {
137            reader,
138            handler: handler.clone(),
139        };
140        let writer = ISWriter { writer, handler };
141
142        Ok(Self { reader, writer })
143    }
144
145    pub fn stream(&mut self) -> impl Stream<Item = Result<RawPacket, error::ISReadError>> + '_ {
146        self.reader.stream()
147    }
148
149    pub async fn send(&mut self, packet: &AprsPacket) -> Result<(), error::ISSendError> {
150        self.writer.send(packet).await
151    }
152
153    pub fn split(self) -> (ISReader, ISWriter) {
154        (self.reader, self.writer)
155    }
156
157    async fn init_connect(settings: &ISSettings) -> io::Result<(Reader, Writer)> {
158        let address = format!("{}:{}", settings.host, settings.port);
159
160        let stream = TcpStream::connect(address).await?;
161
162        let (r, w) = stream.into_split();
163
164        let writer = FramedWrite::new(w, codec::ByteLinesCodec::new());
165        let reader = FramedRead::new(r, codec::ByteLinesCodec::new());
166
167        Ok((reader, writer))
168    }
169
170    async fn login(settings: &ISSettings, writer: &mut Writer) -> io::Result<()> {
171        let login_message = {
172            let name = option_env!("CARGO_PKG_NAME").unwrap_or("unknown");
173            let version = option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0");
174
175            format!(
176                "user {} pass {} vers {} {}{}",
177                settings.callsign,
178                settings.passcode,
179                name,
180                version,
181                if settings.filter.is_empty() {
182                    "".to_string()
183                } else {
184                    format!(" filter {}", settings.filter)
185                }
186            )
187        };
188
189        info!("Logging on to APRS-IS server");
190        trace!("Login message: {}", login_message);
191        writer.send(login_message.as_bytes()).await?;
192
193        Ok(())
194    }
195
196    async fn heartbeat(writer: Arc<Mutex<Writer>>) -> JoinHandle<()> {
197        // Automatically terminates once the reader and writer are dropped
198        tokio::spawn(async move {
199            let mut interval = time::interval(Duration::from_secs(3600));
200            loop {
201                interval.tick().await;
202                info!("Sending keep alive message to APRS-IS server");
203                if writer
204                    .lock()
205                    .await
206                    .send("# keep alive".as_bytes())
207                    .await
208                    .is_err()
209                {
210                    break;
211                }
212            }
213        })
214    }
215}