Skip to main content

ds1090/
lib.rs

1pub mod defaultconfig;
2pub mod flight;
3
4use datastreamcorelib::datamessage::PubSubDataMessage;
5use datastreamservicelib::heartbeat::heartbeat_task;
6use datastreamservicelib::utils::{configval_as_string_vec, wait_for_tasks};
7use datastreamservicelib::zmqwrappers::TokioPubSubManager;
8use datastreamservicelib::TerminationFlag;
9// we need to explicitly import this trait for it to work
10use datastreamcorelib::pubsub::PubSubManager;
11use failure::Fallible;
12use flight::{Flights, Message};
13use futures::stream::FuturesUnordered;
14use futures::{future, StreamExt};
15use log;
16use std::net::SocketAddr;
17use std::sync::atomic::Ordering;
18use std::time::Duration;
19use tokio::net::TcpStream;
20use tokio::time::delay_for;
21use tokio_util;
22use tokio_util::codec::{FramedRead, LinesCodec};
23use toml;
24
25/// Main entrypoint
26pub async fn mainloop(config: toml::Value, term: TerminationFlag) -> Fallible<()> {
27    // Form dump1090 address
28    let dump1090addr = match validate_dump1090addr(&config) {
29        Ok(addr) => addr,
30        Err(e) => {
31            return Err(failure::err_msg(format!(
32                "dump1090 address validation failed: {}",
33                e
34            )));
35        }
36    };
37    // Whether or not to combine data from multiple packets and only output
38    // when full flight info is available, or it changes
39    let track: bool = match config["process"]["track"].as_bool() {
40        Some(track) => track,
41        None => false,
42    };
43    // These need be available for a Flight
44    // FIXME: turn these into an enum
45    let required_fields: Vec<String> = match config["process"]["required_fields"].as_array() {
46        Some(required_fields) => required_fields
47            .to_vec()
48            .iter()
49            .map(|x| String::from(x.as_str().unwrap()))
50            .collect(),
51        None => vec![],
52    };
53
54    // Instances of things we are going to need
55    let psmgr = TokioPubSubManager::instance();
56
57    // Just using ? in the lock will fail with something not being sendable
58    match psmgr.lock() {
59        Err(_) => {
60            term.store(true, Ordering::Relaxed);
61            return Err(failure::err_msg("Could not lock psmgr"));
62        }
63        Ok(mut psmgr_locked) => {
64            psmgr_locked.term_flag = term.clone();
65            psmgr_locked
66                .set_default_pub_uris(&configval_as_string_vec(&config["zmq"]["pub_sockets"])?)?;
67        }
68    }
69    // Give the socket a moment to stabilize
70    delay_for(Duration::from_millis(150)).await;
71
72    // Create tasks
73    let tasks = FuturesUnordered::new();
74    tasks.push(tokio::spawn(heartbeat_task(term.clone())));
75    tasks.push(tokio::spawn(reader_publisher_task(
76        track,
77        required_fields,
78        dump1090addr,
79        "messages".to_string(),
80        term.clone(),
81    )));
82    wait_for_tasks(tasks).await?;
83    Ok(())
84}
85
86/// Validates that dump1090 address and port in configuration look sane
87pub fn validate_dump1090addr(config: &toml::Value) -> Fallible<SocketAddr> {
88    let host: String = match config["dump1090"]["host"].as_str() {
89        Some(host) => host.to_string(),
90        None => {
91            return Err(failure::err_msg("dump1090.host not supplied"));
92        }
93    };
94    let port: u16 = match config["dump1090"]["port"].as_integer() {
95        Some(port) => {
96            if port < 1 || port > 65535 {
97                return Err(failure::err_msg(
98                    "dump1090.port does not look like a valid TCP port number",
99                ));
100            }
101            port as u16
102        }
103        None => 30003,
104    };
105
106    let dump1090addr = match format!("{}:{}", host, port).parse::<SocketAddr>() {
107        Ok(addr) => addr,
108        Err(e) => {
109            return Err(failure::err_msg(format!(
110                "Could not parse dump1090 address: {}",
111                e
112            )))
113        }
114    };
115
116    Ok(dump1090addr)
117}
118
119/// Reader task reads dump1090 messages over TCP, and parses and PUBlishes them
120async fn reader_publisher_task(
121    track: bool,
122    required_fields: Vec<String>,
123    addr: SocketAddr,
124    topic: String,
125    term: TerminationFlag,
126) -> Fallible<()> {
127    let psmgr = TokioPubSubManager::instance();
128    let mut flights = Flights::new(required_fields);
129    loop {
130        // We can't await inside the lock match so yield here instead to the arm
131        // where we fail to acquire the lock
132        // https://docs.rs/tokio/0.2.16/tokio/fn.spawn.html#using-send-values-from-a-task
133        tokio::task::yield_now().await;
134        if term.load(Ordering::Relaxed) {
135            log::trace!("Got term flag, exiting reader task");
136            return Ok(());
137        }
138
139        // Connect to dump1090 port
140        let mut stream = match TcpStream::connect(&addr).await {
141            Ok(stream) => {
142                log::debug!("Connected: {:?}", stream);
143                stream
144            }
145            Err(e) => {
146                log::debug!("Connection failed to: {:?}: {}, retrying", addr, e);
147                delay_for(Duration::from_millis(1000)).await;
148                continue;
149            }
150        };
151        let (r, _w) = stream.split();
152        let mut stream =
153            FramedRead::new(r, LinesCodec::new_with_max_length(1024)).filter_map(|i| match i {
154                Ok(i) => future::ready(Some(i)),
155                Err(e) => {
156                    log::error!("Could not read from socket: {}", e);
157                    future::ready(None)
158                }
159            });
160
161        // Consume input lines
162        while let Some(line) = stream.next().await {
163            if term.load(Ordering::Relaxed) {
164                log::trace!("Got term flag, exiting reader task");
165                return Ok(());
166            }
167
168            // Attempt to parse a line into a message
169            let mut adsbmessage = Message::parse(&line);
170            let mut pubsubmessage = PubSubDataMessage::new(topic.clone())?;
171
172            // Track flights if configured
173            if track && adsbmessage.valid() {
174                if let Some(_flight) = flights.update(&mut adsbmessage) {
175                    pubsubmessage.data = _flight.values.json();
176                } else {
177                    continue;
178                }
179            } else if !track && adsbmessage.valid() {
180                pubsubmessage.data = adsbmessage.json();
181            } else {
182                continue;
183            }
184
185            // Publish message
186            match psmgr.lock() {
187                Err(e) => {
188                    // Wait for the socket lock
189                    log::debug!("Could not acquire psmgr lock: {:?}", e);
190                    continue;
191                }
192                Ok(psmgr_locked) => {
193                    psmgr_locked.publish(&pubsubmessage)?;
194                }
195            }
196        }
197
198        // Pause for a while before reconnect
199        delay_for(Duration::from_millis(1000)).await;
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::defaultconfig::default_config_str;
207    use datastreamcorelib::logging::init_logging;
208    use std::env::temp_dir;
209    use std::sync::atomic::AtomicBool;
210    use std::sync::Arc;
211    use toml;
212
213    #[tokio::test]
214    async fn mainloop_heartbeat() {
215        init_logging(log::LevelFilter::Trace).unwrap();
216        let term: TerminationFlag = Arc::new(AtomicBool::new(false));
217        let mut config: toml::Value = toml::from_str(default_config_str().as_str()).unwrap();
218        let mut tmppath1 = temp_dir();
219        tmppath1.push("3f7639c2-df18-49fa-a788-11a0486e7ac1_pub.sock");
220        let sockpath1 = "ipc://".to_string() + &tmppath1.to_string_lossy();
221        let new_toml_sockets = toml::Value::try_from(vec![sockpath1.clone()]).unwrap();
222        config["zmq"]["pub_sockets"] = new_toml_sockets;
223        assert_eq!(
224            configval_as_string_vec(&config["zmq"]["pub_sockets"]).unwrap(),
225            vec![sockpath1.clone()]
226        );
227        let maintask = tokio::spawn(mainloop(config, term.clone()));
228
229        // Be fancy and get rid of the delay
230        delay_for(Duration::from_millis(1000)).await;
231
232        // Tell things to shut down
233        term.store(true, Ordering::Relaxed);
234        maintask.await.expect("Main task failed").unwrap();
235    }
236}