#![doc = include_str!("../readme.md")]
mod dedup;
mod filters;
mod sensor;
mod shell;
mod snapshot;
mod source;
mod table;
mod tui;
mod util;
mod web;
use crate::tui::Event;
use crate::util::expanduser;
use crate::web::serve_web_api;
use clap::{Command, CommandFactory, Parser, ValueHint};
use clap_complete::{generate, Generator};
use crossterm::event::KeyCode;
use ratatui::widgets::*;
use redis::AsyncCommands;
use rs1090::data::aircraft;
use rs1090::decode::commb::MessageProcessor;
use rs1090::decode::cpr::{decode_position, AircraftState};
use rs1090::decode::serialize_config;
use rs1090::prelude::*;
use sensor::Sensor;
use serde::Deserialize;
use std::collections::BTreeMap;
use std::io;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::sync::{watch, Mutex, RwLock};
use tokio::time::{sleep, Duration};
use tracing::warn;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
#[derive(Default, Deserialize, Parser)]
#[command(
name = "jet1090",
version,
author = "xoolive",
about = "Decode and serve Mode S demodulated raw messages"
)]
struct Options {
#[arg(short, long)]
#[serde(default)]
verbose: bool,
#[arg(short, long, default_value=None, value_hint=ValueHint::FilePath)]
output: Option<String>,
#[arg(short, long)]
#[serde(default)]
interactive: bool,
#[arg(long)]
#[serde(default)]
flags: bool,
#[arg(long, default_value=None)]
serve_port: Option<u16>,
#[arg(
long,
env = "EXPIRE_AIRCRAFT",
short = 'x',
conflicts_with = "no_history_expire"
)]
history_expire: Option<u64>,
#[arg(long, conflicts_with = "history_expire")]
#[serde(default)]
no_history_expire: bool,
#[arg(
long,
default_value = "30",
conflicts_with = "no_interactive_expire"
)]
interactive_expire: Option<u64>,
#[arg(long, conflicts_with = "interactive_expire")]
#[serde(default)]
no_interactive_expire: bool,
#[arg(long, value_name = "DF")]
df_filter: Option<Vec<u16>>,
#[arg(long, value_name = "ICAO24")]
aircraft_filter: Option<Vec<ICAO>>,
#[arg(long, default_value=None)]
#[serde(default)]
prevent_sleep: bool,
#[arg(short, long, default_value=None)]
#[serde(default)]
update_position: bool,
#[arg(long, default_value = "450")]
deduplication: Option<u32>,
#[arg(long, default_value = "200")]
reorder_window: Option<u32>,
#[arg(long, default_value=None)]
#[serde(default)]
no_deduplication: bool,
#[arg(long)]
#[serde(default)]
stats: bool,
#[arg(long = "completion", value_enum)]
#[serde(skip)]
completion: Option<shell::Shell>,
#[arg(long)]
#[serde(skip)]
update_db: bool,
#[serde(default)]
sources: Vec<source::Source>,
#[arg(short, long, value_name = "FILE")]
log_file: Option<String>,
#[arg(short, long, value_name = "REDIS URL")]
redis_url: Option<String>,
#[arg(long, value_name = "REDIS TOPIC")]
redis_topic: Option<String>,
#[arg(long, value_name = "SECONDS")]
#[serde(default)]
redis_retry_interval: Option<u64>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv().ok();
let mut options = Options::default();
let mut cfg_path = match std::env::var("XDG_CONFIG_HOME") {
Ok(xdg_config) => expanduser(PathBuf::from(xdg_config)),
Err(_) => dirs::config_dir().unwrap_or_default(),
};
cfg_path.push("jet1090");
cfg_path.push("config.toml");
if cfg_path.exists() {
let string = fs::read_to_string(cfg_path).await.ok().unwrap();
options = toml::from_str(&string).unwrap();
}
if let Ok(config_file) = std::env::var("JET1090_CONFIG") {
let path = expanduser(PathBuf::from(config_file));
let string = fs::read_to_string(path)
.await
.expect("Configuration file not found");
options = toml::from_str(&string).unwrap();
}
let mut cli_options = Options::parse();
if let Some(generator) = cli_options.completion {
let mut cmd = Options::command();
print_completions(generator, &mut cmd);
return Ok(());
}
if cli_options.update_db {
aircraft::update_db().await.unwrap();
return Ok(());
}
if cli_options.verbose {
options.verbose = true;
}
if cli_options.output.is_some() {
options.output = cli_options.output;
}
if cli_options.interactive {
options.interactive = true;
}
if cli_options.flags {
options.flags = true;
}
if cli_options.serve_port.is_some() {
options.serve_port = cli_options.serve_port;
}
if cli_options.no_history_expire {
options.no_history_expire = true;
options.history_expire = None;
} else if let Some(history_expire) = cli_options.history_expire {
options.no_history_expire = false;
options.history_expire = Some(history_expire);
}
if cli_options.no_interactive_expire {
options.interactive_expire = Some(0);
} else if let Some(interactive_expire) = cli_options.interactive_expire {
options.interactive_expire = Some(interactive_expire);
}
if cli_options.df_filter.is_some() {
options.df_filter = cli_options.df_filter;
}
if cli_options.aircraft_filter.is_some() {
options.aircraft_filter = cli_options.aircraft_filter;
}
if cli_options.prevent_sleep {
options.prevent_sleep = cli_options.prevent_sleep;
}
if cli_options.update_position {
options.update_position = cli_options.update_position;
}
if cli_options.log_file.is_some() {
options.log_file = cli_options.log_file;
}
if cli_options.redis_url.is_some() {
options.redis_url = cli_options.redis_url;
}
if cli_options.redis_topic.is_some() {
options.redis_topic = cli_options.redis_topic;
}
if cli_options.redis_retry_interval.is_some() {
options.redis_retry_interval = cli_options.redis_retry_interval;
}
if cli_options.stats {
options.stats = cli_options.stats;
}
if cli_options.deduplication.is_some() {
options.deduplication = cli_options.deduplication;
}
if cli_options.reorder_window.is_some() {
options.reorder_window = cli_options.reorder_window;
}
if options.stats {
serialize_config(true);
}
if options.history_expire.is_none() && !options.no_history_expire {
options.history_expire = Some(15);
}
options.sources.append(&mut cli_options.sources);
let env_filter = EnvFilter::from_default_env();
let subscriber = tracing_subscriber::registry().with(env_filter);
match options.log_file.as_deref() {
Some("-") if !cli_options.interactive => {
subscriber.with(fmt::layer().pretty()).init();
}
Some(log_file) if log_file != "-" => {
let file = std::fs::File::create(log_file).unwrap_or_else(|_| {
panic!("fail to create log file: {log_file}")
});
let file_layer = fmt::layer().with_writer(file).with_ansi(false);
subscriber.with(file_layer).init();
}
_ => {
subscriber.init(); }
}
if options.sources.is_empty() {
eprintln!(
"No source of data specified, use --help for more information"
);
std::process::exit(1);
}
let mut redis_connect = match options
.redis_url
.map(|url| redis::Client::open(url).unwrap())
{
Some(c) => Some(
c.get_multiplexed_async_connection()
.await
.expect("Unable to connect to the Redis server"),
),
None => None,
};
let redis_topic = options.redis_topic.unwrap_or("jet1090".to_string());
let redis_retry_interval =
Duration::from_secs(options.redis_retry_interval.unwrap_or(5));
let filters = filters::Filters {
df_filter: options
.df_filter
.map(|df| df.into_iter().map(|v| format!("{v}")).collect()),
aircraft_filter: options.aircraft_filter,
};
let file = if let Some(output_path) = options.output {
let output_path = expanduser(PathBuf::from(output_path));
Some(
fs::OpenOptions::new()
.append(true)
.create(true)
.open(output_path)
.await?,
)
} else {
None
};
let file = Arc::new(Mutex::new(file));
let aircraftdb = aircraft::aircraft().await;
let _awake = match options.prevent_sleep {
true => Some(
keepawake::Builder::default()
.display(false)
.idle(true)
.sleep(true)
.reason("jet1090 decoding in progress")
.app_name("jet1090")
.app_reverse_domain("io.github.jet1090")
.create()?,
),
false => None,
};
let mut aircraft: BTreeMap<ICAO, AircraftState> = BTreeMap::new();
let terminal = if options.interactive {
Some(tui::init()?)
} else {
None
};
let _terminal_guard = if terminal.is_some() {
Some(tui::TerminalGuard)
} else {
None
};
let width = if let Some(terminal) = &terminal {
terminal.size()?.width
} else {
0
};
let mut events = if terminal.is_some() {
Some(tui::EventHandler::new(width))
} else {
None
};
let mut references = BTreeMap::<u64, Option<Position>>::new();
let mut sensors = BTreeMap::<u64, Sensor>::new();
for source in options.sources.iter() {
for sensor in sensor::sensors(source).await {
references.insert(sensor.serial, sensor.reference);
sensors.insert(sensor.serial, sensor);
}
}
let shared = Arc::new(SharedState::new(sensors));
let shared_dec = shared.clone();
let shared_web = shared.clone();
let shared_exp = shared.clone();
let mut quit_rx = shared_dec.quit_tx.subscribe();
if terminal.is_some() {
let shared_signal = shared.clone();
#[cfg(unix)]
tokio::spawn(async move {
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt())
.expect("Failed to create SIGINT handler");
let mut sigterm = signal(SignalKind::terminate())
.expect("Failed to create SIGTERM handler");
let mut sighup = signal(SignalKind::hangup())
.expect("Failed to create SIGHUP handler");
tokio::select! {
_ = sigint.recv() => {},
_ = sigterm.recv() => {},
_ = sighup.recv() => {},
}
tui::restore().ok();
shared_signal.request_quit();
});
#[cfg(windows)]
tokio::spawn(async move {
use tokio::signal::windows::{
ctrl_c, ctrl_close, ctrl_logoff, ctrl_shutdown,
};
let mut sig_c = ctrl_c().expect("ctrl_c");
let mut sig_close = ctrl_close().expect("ctrl_close");
let mut sig_shutdown = ctrl_shutdown().expect("ctrl_shutdown");
let mut sig_logoff = ctrl_logoff().expect("ctrl_logoff");
tokio::select! {
_ = sig_c.recv() => {},
_ = sig_close.recv() => {},
_ = sig_shutdown.recv() => {},
_ = sig_logoff.recv() => {},
}
tui::restore().ok();
shared_signal.request_quit();
});
}
let app_tui = Arc::new(Mutex::new(Jet1090 {
items: Vec::new(),
state: TableState::default().with_selected(0),
scroll_state: ScrollbarState::new(0),
sort_key: SortKey::default(),
sort_asc: false,
width,
is_search_mode: false,
search_query: "".to_string(),
interactive_expire: options.interactive_expire.unwrap_or(30),
flags: options.flags,
}));
if let Some(mut terminal) = terminal {
let app_tui_task = app_tui.clone();
let shared_tui = shared.clone();
let mut events =
events.take().expect("event handler in interactive mode");
tokio::spawn(async move {
loop {
if let Ok(event) = events.next().await {
update(&mut app_tui_task.lock().await, event, &shared_tui)?;
}
let mut app = app_tui_task.lock().await;
if shared_tui.should_quit.load(Ordering::Relaxed) {
break;
}
if shared_tui.should_clear.swap(false, Ordering::Relaxed) {
terminal.clear()?;
}
let state_vectors = shared_tui.state_vectors.read().await;
terminal.draw(|frame| {
table::build_table(
frame,
&mut app,
&shared_tui,
&state_vectors,
)
})?;
drop(state_vectors); }
tui::restore()
});
}
if let Some(minutes) = options.history_expire {
if minutes > 0 {
tokio::spawn(expire_aircraft(shared_exp.clone(), minutes));
}
}
if let Some(port) = options.serve_port {
tokio::spawn(serve_web_api(shared_web, port));
}
if file.lock().await.is_some() {
let file_flush = file.clone();
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(1)).await;
if let Some(f) = file_flush.lock().await.as_mut() {
let _ = f.flush().await;
}
}
});
}
let multiplier = references.len();
let (tx, mut rx) = tokio::sync::mpsc::channel(100 * multiplier + 1);
let (tx_dedup, mut rx_dedup) =
tokio::sync::mpsc::channel(100 * multiplier + 1);
let (shutdown_tx, _shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let mut source_handles = Vec::new();
for source in options.sources.into_iter() {
let serial = source.serial();
let tx_copy = tx.clone();
let source_name = source.name.clone();
let shutdown_rx = shutdown_tx.subscribe();
let handle = source.receiver(tx_copy, serial, source_name, shutdown_rx);
source_handles.push(handle);
}
if !options.no_deduplication {
tokio::spawn(async move {
dedup::deduplicate_messages(
rx,
tx_dedup,
options.deduplication.unwrap_or(450),
options.reorder_window.unwrap_or(200),
)
.await;
});
} else {
tokio::spawn(async move {
while let Some(mut msg) = rx.recv().await {
let start = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime before unix epoch")
.as_secs_f64();
if let Ok((_, decoded)) = Message::from_bytes((&msg.frame, 0)) {
msg.decode_time = Some(
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime before unix epoch")
.as_secs_f64()
- start,
);
msg.message = Some(decoded);
if tx_dedup.send(msg).await.is_err() {
break;
}
}
}
});
}
let update_reference = match options.update_position {
true => Some(Box::new(|pos: &AirbornePosition| {
pos.alt.is_some_and(|alt| alt < 5000)
}) as Box<dyn Fn(&AirbornePosition) -> bool>),
false => None,
};
let mut last_reference_update: f64 = 0.0;
let mut first_msg = true;
loop {
if *quit_rx.borrow() {
break;
}
let maybe_msg = tokio::select! {
_ = quit_rx.changed() => None,
maybe_msg = rx_dedup.recv() => maybe_msg,
};
let Some(mut msg) = maybe_msg else {
break;
};
if first_msg {
shared_dec.should_clear.store(true, Ordering::Relaxed);
first_msg = false;
}
if msg.timestamp - last_reference_update > 300.0 {
for (_, reference) in references.iter_mut() {
rs1090::decode::cpr::update_global_reference(
&aircraft,
reference,
msg.timestamp,
);
}
last_reference_update = msg.timestamp;
}
if let Some(message) = &mut msg.message {
match &mut message.df {
ExtendedSquitterADSB(adsb) => match adsb.message {
ME::BDS05 { .. } | ME::BDS06 { .. } => {
let serial = msg
.metadata
.first()
.map(|meta| meta.serial)
.unwrap();
let mut reference = references[&serial];
decode_position(
&mut adsb.message,
msg.timestamp,
&adsb.icao24,
&mut aircraft,
&mut reference,
&update_reference,
);
if options.update_position {
for meta in &msg.metadata {
let _ =
references.insert(meta.serial, reference);
}
}
}
_ => {}
},
ExtendedSquitterTisB { cf, .. } => match cf.me {
ME::BDS05 { .. } | ME::BDS06 { .. } => {
let serial = msg
.metadata
.first()
.map(|meta| meta.serial)
.unwrap();
let mut reference = references[&serial];
decode_position(
&mut cf.me,
msg.timestamp,
&cf.aa,
&mut aircraft,
&mut reference,
&update_reference,
)
}
_ => {}
},
_ => {}
}
};
if let Some(message) = &mut msg.message {
MessageProcessor::new(message, &mut aircraft)
.sanitize_commb()
.record_observed_bds()
.finish();
}
snapshot::update_snapshot(
&shared_dec,
&mut msg,
&aircraftdb,
&aircraft,
)
.await;
let is_in = filters::Filters::is_in(&filters, &msg);
if let Ok(json) = serde_json::to_string(&msg) {
if is_in {
if options.verbose {
println!("{json}");
}
if let Some(f) = file.lock().await.as_mut() {
f.write_all(json.as_bytes()).await?;
f.write_all("\n".as_bytes()).await?;
}
if let Some(c) = &mut redis_connect {
publish_with_retry(
c,
redis_topic.as_str(),
&json,
redis_retry_interval,
)
.await;
}
}
}
match options.history_expire {
Some(0) => (),
_ => {
if is_in {
snapshot::store_history(&shared_dec, msg, &aircraftdb).await
}
}
}
if shared_dec.should_quit.load(Ordering::Relaxed) {
break;
}
}
let _ = shutdown_tx.send(());
let timeout = Duration::from_secs(2);
for handle in source_handles {
let _ = tokio::time::timeout(timeout, handle).await;
}
Ok(())
}
async fn expire_aircraft(shared: Arc<SharedState>, minutes: u64) {
loop {
sleep(Duration::from_secs(60)).await;
{
let mut state_vectors = shared.state_vectors.write().await;
let now = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime before unix epoch")
.as_secs();
let remove_keys = state_vectors
.iter()
.filter(|(_key, value)| now > value.cur.lastseen + minutes * 60)
.map(|(key, _)| key.to_string())
.collect::<Vec<String>>();
for key in remove_keys {
state_vectors.remove(&key);
}
let _ = state_vectors
.iter_mut()
.map(|(_key, value)| {
value.hist.retain(|elt| {
now < (elt.timestamp as u64) + minutes * 60
})
})
.collect::<Vec<()>>();
}
}
}
#[derive(Debug, Default)]
pub struct Jet1090 {
state: TableState,
items: Vec<String>,
scroll_state: ScrollbarState,
sort_key: SortKey,
sort_asc: bool,
width: u16,
is_search_mode: bool,
search_query: String,
interactive_expire: u64,
flags: bool,
}
#[derive(Debug)]
pub struct SharedState {
state_vectors: Arc<RwLock<BTreeMap<String, snapshot::StateVectors>>>,
sensors: BTreeMap<u64, Sensor>,
should_quit: Arc<AtomicBool>,
quit_tx: watch::Sender<bool>,
should_clear: Arc<AtomicBool>,
}
impl SharedState {
fn new(sensors: BTreeMap<u64, Sensor>) -> Self {
let (quit_tx, _quit_rx) = watch::channel(false);
Self {
state_vectors: Arc::new(RwLock::new(BTreeMap::new())),
sensors,
should_quit: Arc::new(AtomicBool::new(false)),
quit_tx,
should_clear: Arc::new(AtomicBool::new(false)),
}
}
fn request_quit(&self) {
self.should_quit.store(true, Ordering::Relaxed);
let _ = self.quit_tx.send(true);
}
}
#[derive(Debug, Default, PartialEq)]
pub enum SortKey {
CALLSIGN,
ALTITUDE,
VRATE,
#[default]
COUNT,
FIRST,
LAST,
}
fn update(
jet1090: &mut tokio::sync::MutexGuard<Jet1090>,
event: Event,
shared: &SharedState,
) -> std::io::Result<()> {
match event {
Event::Key(key) => {
use KeyCode::*;
match (jet1090.is_search_mode, key.code) {
(true, Char(c)) => jet1090.search_query.push(c),
(true, Backspace) => {
jet1090.search_query.pop();
}
(true, Enter) => jet1090.is_search_mode = false,
(true, Esc) => {
jet1090.is_search_mode = false;
jet1090.search_query = "".to_string()
}
(false, Char('j')) | (_, Down) => jet1090.next(),
(false, Char('k')) | (_, Up) => jet1090.previous(),
(false, Char('g')) | (_, PageUp) | (_, Home) => jet1090.home(),
(false, Char('q')) | (false, Esc) => shared.request_quit(),
(false, Char('a')) => jet1090.sort_key = SortKey::ALTITUDE,
(false, Char('c')) => jet1090.sort_key = SortKey::CALLSIGN,
(false, Char('v')) => jet1090.sort_key = SortKey::VRATE,
(false, Char('.')) => jet1090.sort_key = SortKey::COUNT,
(false, Char('f')) => jet1090.sort_key = SortKey::FIRST,
(false, Char('l')) => jet1090.sort_key = SortKey::LAST,
(false, Char('-')) => jet1090.sort_asc = !jet1090.sort_asc,
(false, Char('/')) => jet1090.is_search_mode = true,
_ => {}
}
}
Event::Tick(size) => jet1090.width = size,
_ => {}
}
Ok(())
}
impl Jet1090 {
pub fn next(&mut self) {
let i = match self.state.selected() {
Some(i) => {
if i >= self.items.len() - 1 {
0
} else {
i + 1
}
}
None => 0,
};
self.state.select(Some(i));
self.scroll_state = self.scroll_state.position(i);
}
pub fn previous(&mut self) {
let i = match self.state.selected() {
Some(i) => {
if i == 0 {
self.items.len() - 1
} else {
i - 1
}
}
None => 0,
};
self.state.select(Some(i));
self.scroll_state = self.scroll_state.position(i);
}
pub fn home(&mut self) {
self.state.select(Some(0));
self.scroll_state = self.scroll_state.position(0);
}
}
fn print_completions<G: Generator>(gen: G, cmd: &mut Command) {
generate(gen, cmd, cmd.get_name().to_string(), &mut io::stdout());
}
async fn publish_with_retry(
connection: &mut redis::aio::MultiplexedConnection,
topic: &str,
payload: &str,
retry_interval: Duration,
) {
loop {
match connection.publish::<_, _, ()>(topic, payload).await {
Ok(()) => break,
Err(err) => {
if retry_interval.is_zero() {
warn!(error = %err, "Redis publish failed; retries disabled");
break;
}
warn!(
error = %err,
retry_seconds = retry_interval.as_secs(),
"Redis publish failed; retrying"
);
sleep(retry_interval).await;
}
}
}
}
#[cfg(test)]
mod tests {
use crate::Options;
#[test]
fn test_config() {
let options: Options = toml::from_str(
r#"
verbose = false
interactive = true
serve_port = 8080
no_history_expire = true
prevent_sleep = false
update_position = false
[[sources]]
udp = "0.0.0.0:1234"
airport = 'LFBO'
[[sources]]
udp = "0.0.0.0:3456"
latitude = 48.723
longitude = 2.379
"#,
)
.unwrap();
assert!(options.interactive);
assert!(options.history_expire.is_none());
assert_eq!(options.sources.len(), 2);
}
}