1pub mod defaultconfig;
2pub mod flight;
3
4use datastreamcorelib::datamessage::PubSubDataMessage;
5use datastreamservicelib::heartbeat::heartbeat_task;
6use datastreamservicelib::utils::{configval_as_string_vec, wait_for_tasks};
7use datastreamservicelib::zmqwrappers::TokioPubSubManager;
8use datastreamservicelib::TerminationFlag;
9use datastreamcorelib::pubsub::PubSubManager;
11use failure::Fallible;
12use flight::{Flights, Message};
13use futures::stream::FuturesUnordered;
14use futures::{future, StreamExt};
15use log;
16use std::net::SocketAddr;
17use std::sync::atomic::Ordering;
18use std::time::Duration;
19use tokio::net::TcpStream;
20use tokio::time::delay_for;
21use tokio_util;
22use tokio_util::codec::{FramedRead, LinesCodec};
23use toml;
24
25pub async fn mainloop(config: toml::Value, term: TerminationFlag) -> Fallible<()> {
27 let dump1090addr = match validate_dump1090addr(&config) {
29 Ok(addr) => addr,
30 Err(e) => {
31 return Err(failure::err_msg(format!(
32 "dump1090 address validation failed: {}",
33 e
34 )));
35 }
36 };
37 let track: bool = match config["process"]["track"].as_bool() {
40 Some(track) => track,
41 None => false,
42 };
43 let required_fields: Vec<String> = match config["process"]["required_fields"].as_array() {
46 Some(required_fields) => required_fields
47 .to_vec()
48 .iter()
49 .map(|x| String::from(x.as_str().unwrap()))
50 .collect(),
51 None => vec![],
52 };
53
54 let psmgr = TokioPubSubManager::instance();
56
57 match psmgr.lock() {
59 Err(_) => {
60 term.store(true, Ordering::Relaxed);
61 return Err(failure::err_msg("Could not lock psmgr"));
62 }
63 Ok(mut psmgr_locked) => {
64 psmgr_locked.term_flag = term.clone();
65 psmgr_locked
66 .set_default_pub_uris(&configval_as_string_vec(&config["zmq"]["pub_sockets"])?)?;
67 }
68 }
69 delay_for(Duration::from_millis(150)).await;
71
72 let tasks = FuturesUnordered::new();
74 tasks.push(tokio::spawn(heartbeat_task(term.clone())));
75 tasks.push(tokio::spawn(reader_publisher_task(
76 track,
77 required_fields,
78 dump1090addr,
79 "messages".to_string(),
80 term.clone(),
81 )));
82 wait_for_tasks(tasks).await?;
83 Ok(())
84}
85
86pub fn validate_dump1090addr(config: &toml::Value) -> Fallible<SocketAddr> {
88 let host: String = match config["dump1090"]["host"].as_str() {
89 Some(host) => host.to_string(),
90 None => {
91 return Err(failure::err_msg("dump1090.host not supplied"));
92 }
93 };
94 let port: u16 = match config["dump1090"]["port"].as_integer() {
95 Some(port) => {
96 if port < 1 || port > 65535 {
97 return Err(failure::err_msg(
98 "dump1090.port does not look like a valid TCP port number",
99 ));
100 }
101 port as u16
102 }
103 None => 30003,
104 };
105
106 let dump1090addr = match format!("{}:{}", host, port).parse::<SocketAddr>() {
107 Ok(addr) => addr,
108 Err(e) => {
109 return Err(failure::err_msg(format!(
110 "Could not parse dump1090 address: {}",
111 e
112 )))
113 }
114 };
115
116 Ok(dump1090addr)
117}
118
119async fn reader_publisher_task(
121 track: bool,
122 required_fields: Vec<String>,
123 addr: SocketAddr,
124 topic: String,
125 term: TerminationFlag,
126) -> Fallible<()> {
127 let psmgr = TokioPubSubManager::instance();
128 let mut flights = Flights::new(required_fields);
129 loop {
130 tokio::task::yield_now().await;
134 if term.load(Ordering::Relaxed) {
135 log::trace!("Got term flag, exiting reader task");
136 return Ok(());
137 }
138
139 let mut stream = match TcpStream::connect(&addr).await {
141 Ok(stream) => {
142 log::debug!("Connected: {:?}", stream);
143 stream
144 }
145 Err(e) => {
146 log::debug!("Connection failed to: {:?}: {}, retrying", addr, e);
147 delay_for(Duration::from_millis(1000)).await;
148 continue;
149 }
150 };
151 let (r, _w) = stream.split();
152 let mut stream =
153 FramedRead::new(r, LinesCodec::new_with_max_length(1024)).filter_map(|i| match i {
154 Ok(i) => future::ready(Some(i)),
155 Err(e) => {
156 log::error!("Could not read from socket: {}", e);
157 future::ready(None)
158 }
159 });
160
161 while let Some(line) = stream.next().await {
163 if term.load(Ordering::Relaxed) {
164 log::trace!("Got term flag, exiting reader task");
165 return Ok(());
166 }
167
168 let mut adsbmessage = Message::parse(&line);
170 let mut pubsubmessage = PubSubDataMessage::new(topic.clone())?;
171
172 if track && adsbmessage.valid() {
174 if let Some(_flight) = flights.update(&mut adsbmessage) {
175 pubsubmessage.data = _flight.values.json();
176 } else {
177 continue;
178 }
179 } else if !track && adsbmessage.valid() {
180 pubsubmessage.data = adsbmessage.json();
181 } else {
182 continue;
183 }
184
185 match psmgr.lock() {
187 Err(e) => {
188 log::debug!("Could not acquire psmgr lock: {:?}", e);
190 continue;
191 }
192 Ok(psmgr_locked) => {
193 psmgr_locked.publish(&pubsubmessage)?;
194 }
195 }
196 }
197
198 delay_for(Duration::from_millis(1000)).await;
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use crate::defaultconfig::default_config_str;
207 use datastreamcorelib::logging::init_logging;
208 use std::env::temp_dir;
209 use std::sync::atomic::AtomicBool;
210 use std::sync::Arc;
211 use toml;
212
213 #[tokio::test]
214 async fn mainloop_heartbeat() {
215 init_logging(log::LevelFilter::Trace).unwrap();
216 let term: TerminationFlag = Arc::new(AtomicBool::new(false));
217 let mut config: toml::Value = toml::from_str(default_config_str().as_str()).unwrap();
218 let mut tmppath1 = temp_dir();
219 tmppath1.push("3f7639c2-df18-49fa-a788-11a0486e7ac1_pub.sock");
220 let sockpath1 = "ipc://".to_string() + &tmppath1.to_string_lossy();
221 let new_toml_sockets = toml::Value::try_from(vec![sockpath1.clone()]).unwrap();
222 config["zmq"]["pub_sockets"] = new_toml_sockets;
223 assert_eq!(
224 configval_as_string_vec(&config["zmq"]["pub_sockets"]).unwrap(),
225 vec![sockpath1.clone()]
226 );
227 let maintask = tokio::spawn(mainloop(config, term.clone()));
228
229 delay_for(Duration::from_millis(1000)).await;
231
232 term.store(true, Ordering::Relaxed);
234 maintask.await.expect("Main task failed").unwrap();
235 }
236}