1mod abort;
2mod codec;
3mod error;
4
5use aprs_parser::{AprsPacket, DecodeError};
6use async_stream::try_stream;
7use futures::sink::SinkExt;
8use futures::{Stream, StreamExt};
9use log::{info, trace};
10use std::io;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::net::{
14 tcp::{OwnedReadHalf, OwnedWriteHalf},
15 TcpStream,
16};
17use tokio::sync::Mutex;
18use tokio::task::JoinHandle;
19use tokio::time;
20use tokio_util::codec::{FramedRead, FramedWrite};
21
22const HEARTBEAT_TIME: u64 = 60;
25
26type Reader = FramedRead<OwnedReadHalf, codec::ByteLinesCodec>;
27type Writer = FramedWrite<OwnedWriteHalf, codec::ByteLinesCodec>;
28
29pub struct RawPacket {
30 pub raw: Vec<u8>,
31}
32
33impl RawPacket {
34 pub fn parsed(&self) -> Result<AprsPacket, DecodeError> {
35 AprsPacket::decode_textual(&self.raw)
36 }
37}
38
39pub struct ISSettings {
40 pub host: String,
41 pub port: u16,
42 pub callsign: String,
43 pub passcode: String,
44 pub filter: String,
45}
46
47impl ISSettings {
48 pub fn new(
49 host: String,
50 port: u16,
51 callsign: String,
52 passcode: String,
53 filter: String,
54 ) -> ISSettings {
55 ISSettings {
56 host,
57 port,
58 callsign,
59 passcode,
60 filter,
61 }
62 }
63}
64
65pub struct ISReader {
66 reader: Reader,
67
68 #[allow(dead_code)]
71 handler: Arc<Mutex<abort::AbortOnDrop<()>>>,
72}
73
74impl ISReader {
75 pub fn stream(&mut self) -> impl Stream<Item = Result<RawPacket, error::ISReadError>> + '_ {
76 try_stream! {
77 while let Some(packet) = tokio::time::timeout(Duration::from_secs(HEARTBEAT_TIME), self.reader.next()).await? {
78 let packet = packet?;
79 if packet[0] == b'#' {
80 let server_message = String::from_utf8(packet.to_vec())?;
81 trace!("Received server response: {}", server_message);
82 if server_message.contains("unverified") {
83 info!("User not verified on APRS-IS server");
84 continue;
85 }
86 if server_message.contains(" verified") {
87 info!("User verified on APRS-IS server");
88 }
89 } else {
90 trace!("{:?}", packet);
91 yield RawPacket {
92 raw: packet.to_vec(),
93 };
94 }
95 }
96 }
97 }
98}
99
100#[derive(Clone)]
101pub struct ISWriter {
102 writer: Arc<Mutex<Writer>>,
103
104 #[allow(dead_code)]
107 handler: Arc<Mutex<abort::AbortOnDrop<()>>>,
108}
109
110impl ISWriter {
111 pub async fn send(&mut self, packet: &AprsPacket) -> Result<(), error::ISSendError> {
112 let mut buf = vec![];
113 packet.encode_textual(&mut buf)?;
114
115 self.writer.lock().await.send(&buf).await?;
116
117 Ok(())
118 }
119}
120
121pub struct ISConnection {
122 reader: ISReader,
123 writer: ISWriter,
124}
125
126impl ISConnection {
127 pub async fn connect(settings: &ISSettings) -> Result<Self, io::Error> {
128 let (reader, mut writer) = Self::init_connect(settings).await?;
129 Self::login(settings, &mut writer).await?;
130
131 let writer = Arc::new(Mutex::new(writer));
132 let handler = Arc::new(Mutex::new(abort::AbortOnDrop::new(
133 Self::heartbeat(writer.clone()).await,
134 )));
135
136 let reader = ISReader {
137 reader,
138 handler: handler.clone(),
139 };
140 let writer = ISWriter { writer, handler };
141
142 Ok(Self { reader, writer })
143 }
144
145 pub fn stream(&mut self) -> impl Stream<Item = Result<RawPacket, error::ISReadError>> + '_ {
146 self.reader.stream()
147 }
148
149 pub async fn send(&mut self, packet: &AprsPacket) -> Result<(), error::ISSendError> {
150 self.writer.send(packet).await
151 }
152
153 pub fn split(self) -> (ISReader, ISWriter) {
154 (self.reader, self.writer)
155 }
156
157 async fn init_connect(settings: &ISSettings) -> io::Result<(Reader, Writer)> {
158 let address = format!("{}:{}", settings.host, settings.port);
159
160 let stream = TcpStream::connect(address).await?;
161
162 let (r, w) = stream.into_split();
163
164 let writer = FramedWrite::new(w, codec::ByteLinesCodec::new());
165 let reader = FramedRead::new(r, codec::ByteLinesCodec::new());
166
167 Ok((reader, writer))
168 }
169
170 async fn login(settings: &ISSettings, writer: &mut Writer) -> io::Result<()> {
171 let login_message = {
172 let name = option_env!("CARGO_PKG_NAME").unwrap_or("unknown");
173 let version = option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0");
174
175 format!(
176 "user {} pass {} vers {} {}{}",
177 settings.callsign,
178 settings.passcode,
179 name,
180 version,
181 if settings.filter.is_empty() {
182 "".to_string()
183 } else {
184 format!(" filter {}", settings.filter)
185 }
186 )
187 };
188
189 info!("Logging on to APRS-IS server");
190 trace!("Login message: {}", login_message);
191 writer.send(login_message.as_bytes()).await?;
192
193 Ok(())
194 }
195
196 async fn heartbeat(writer: Arc<Mutex<Writer>>) -> JoinHandle<()> {
197 tokio::spawn(async move {
199 let mut interval = time::interval(Duration::from_secs(3600));
200 loop {
201 interval.tick().await;
202 info!("Sending keep alive message to APRS-IS server");
203 if writer
204 .lock()
205 .await
206 .send("# keep alive".as_bytes())
207 .await
208 .is_err()
209 {
210 break;
211 }
212 }
213 })
214 }
215}