use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use rs1090::prelude::*;
#[cfg(feature = "rtlsdr")]
use rs1090::source::rtlsdr;
#[cfg(feature = "sero")]
use rs1090::source::sero;
#[cfg(feature = "ssh")]
use rs1090::source::ssh::{TunnelledTcp, TunnelledWebsocket};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Sender;
use tracing::error;
use url::Url;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AddressStruct {
address: String,
port: u16,
jump: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum AddressPath {
Short(String),
Long(AddressStruct),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct WebsocketStruct {
url: String,
jump: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum WebsocketPath {
Short(String),
Long(WebsocketStruct),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Address {
Tcp(AddressPath),
Udp(String),
Websocket(WebsocketPath),
Rtlsdr(Option<String>),
Sero(SeroParams),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Source {
#[serde(flatten)]
pub address: Address,
pub name: Option<String>,
#[serde(flatten)]
pub reference: Option<Position>,
pub altitude: Option<f64>,
}
fn build_serial(input: &str) -> u64 {
let mut hasher = DefaultHasher::new();
input.hash(&mut hasher);
hasher.finish()
}
impl FromStr for Source {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.replace("@", "?"); let default_tcp = Url::parse("tcp://").unwrap();
let url = default_tcp.join(&s).map_err(|e| e.to_string())?;
let address = match url.scheme() {
"tcp" => Address::Tcp(AddressPath::Short(format!(
"{}:{}",
url.host_str().unwrap_or("0.0.0.0"),
match url.host() {
Some(_) => url.port_or_known_default().unwrap_or(10003),
None => {
url.path()
.strip_prefix("/:")
.unwrap()
.parse::<u16>()
.expect("A port number was expected")
}
}
))),
"udp" => Address::Udp(format!(
"{}:{}",
url.host_str().unwrap_or("0.0.0.0"),
url.port_or_known_default().unwrap()
)),
"rtlsdr" => Address::Rtlsdr(url.host_str().map(|s| s.to_string())),
"ws" => Address::Websocket(WebsocketPath::Short(format!(
"ws://{}:{}/{}",
url.host_str().unwrap_or("0.0.0.0"),
url.port_or_known_default().unwrap(),
url.path().strip_prefix("/").unwrap()
))),
_ => return Err("unsupported scheme".to_string()),
};
let mut source = Source {
address,
name: None,
reference: None,
altitude: None,
};
if let Some(query) = url.query() {
source.reference = Position::from_str(query).ok()
};
Ok(source)
}
}
impl Source {
pub fn serial(&self) -> u64 {
match &self.address {
Address::Tcp(address) => {
let name = match address {
AddressPath::Short(s) => s.clone(),
AddressPath::Long(AddressStruct {
address, port, ..
}) => {
format!("{}:{}", address, port)
}
};
build_serial(&name)
}
Address::Udp(name) => build_serial(name),
Address::Websocket(address) => {
let name = match address {
WebsocketPath::Short(s) => s.clone(),
WebsocketPath::Long(WebsocketStruct { url, .. }) => {
url.clone()
}
};
build_serial(&name)
}
Address::Rtlsdr(reference) => {
let name = reference.clone().unwrap_or("rtlsdr".to_string());
build_serial(&name)
}
Address::Sero(_) => 0,
}
}
pub async fn receiver(
&self,
tx: Sender<TimedMessage>,
serial: u64,
name: Option<String>,
) {
match &self.address {
Address::Rtlsdr(args) => {
#[cfg(not(feature = "rtlsdr"))]
{
error!("Compile jet1090 with the rtlsdr feature, {:?} argument ignored", args);
std::process::exit(127);
}
#[cfg(feature = "rtlsdr")]
{
rtlsdr::receiver::<&str>(tx, args.as_deref(), serial, name)
.await
}
}
Address::Sero(sero) => {
#[cfg(not(feature = "sero"))]
{
error!("Compile jet1090 with the sero feature, {:?} argument ignored", sero);
}
#[cfg(feature = "sero")]
{
sero::receiver(sero::SeroClient::from(sero), tx).await
}
}
_ => {
let server_address = match &self.address {
Address::Tcp(address) => match address {
AddressPath::Short(s) => {
beast::BeastSource::Tcp(s.to_owned())
}
#[cfg(not(feature = "ssh"))]
AddressPath::Long(AddressStruct {
address,
port,
..
}) => beast::BeastSource::Tcp(format!(
"{}:{}",
address, port
)),
#[cfg(feature = "ssh")]
AddressPath::Long(AddressStruct {
address,
port,
jump: None,
}) => beast::BeastSource::Tcp(format!(
"{}:{}",
address, port
)),
#[cfg(feature = "ssh")]
AddressPath::Long(AddressStruct {
address,
port,
jump: Some(jump),
}) => beast::BeastSource::TunnelledTcp(TunnelledTcp {
address: address.to_owned(),
port: *port,
jump: jump.to_owned(),
}),
},
Address::Udp(s) => beast::BeastSource::Udp(s.to_owned()),
Address::Websocket(address) => match address {
WebsocketPath::Short(s) => {
beast::BeastSource::Websocket(s.to_owned())
}
#[cfg(not(feature = "ssh"))]
WebsocketPath::Long(WebsocketStruct {
url, ..
}) => beast::BeastSource::Websocket(url.to_owned()),
#[cfg(feature = "ssh")]
WebsocketPath::Long(WebsocketStruct {
url,
jump: None,
..
}) => beast::BeastSource::Websocket(url.to_owned()),
#[cfg(feature = "ssh")]
WebsocketPath::Long(WebsocketStruct {
url,
jump: Some(jump),
}) => {
let parsed_url = Url::parse(url).unwrap();
beast::BeastSource::TunnelledWebsocket(
TunnelledWebsocket {
address: parsed_url
.host_str()
.unwrap()
.to_owned(),
port: parsed_url
.port_or_known_default()
.unwrap(),
url: url.to_owned(),
jump: jump.to_owned(),
},
)
}
},
_ => unreachable!(),
};
if let Err(e) =
beast::receiver(server_address, tx, serial, name).await
{
error!("{}", e.to_string());
}
}
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SeroParams {
pub token: String,
pub df_filter: Option<Vec<u32>>,
pub aircraft_filter: Option<Vec<u32>>,
pub sensor_filter: Option<Vec<String>>,
pub jump: Option<String>,
}
#[cfg(feature = "sero")]
impl From<&SeroParams> for sero::SeroClient {
fn from(value: &SeroParams) -> Self {
sero::SeroClient {
token: value.token.clone(),
df_filter: value.df_filter.clone().unwrap_or_default(),
aircraft_filter: value.aircraft_filter.clone().unwrap_or_default(),
sensor_filter: value.sensor_filter.clone().unwrap_or_default(),
jump: value.jump.clone(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_source() {
let source = Source::from_str("rtlsdr:");
assert!(source.is_ok());
if let Ok(Source { address, .. }) = source {
assert_eq!(address, Address::Rtlsdr(None));
}
let source = Source::from_str("rtlsdr://serial=00000001");
assert!(source.is_ok());
if let Ok(Source { address, .. }) = source {
assert_eq!(
address,
Address::Rtlsdr(Some("serial=00000001".to_string()))
);
}
let source = Source::from_str("rtlsdr:@LFBO");
assert!(source.is_ok());
if let Ok(Source {
address,
name,
reference: Some(pos),
..
}) = source
{
assert_eq!(address, Address::Rtlsdr(None));
assert_eq!(name, None);
assert_eq!(pos.latitude, 43.628101);
assert_eq!(pos.longitude, 1.367263);
}
let source = Source::from_str("http://default");
assert!(source.is_err());
let source = Source::from_str(":4003");
assert!(source.is_ok());
if let Ok(Source {
address: Address::Tcp(path),
name,
reference,
..
}) = source
{
assert_eq!(path, AddressPath::Short("0.0.0.0:4003".to_string()));
assert_eq!(name, None);
assert_eq!(reference, None);
}
let source = Source::from_str(":4003?LFBO");
assert!(source.is_ok());
if let Ok(Source {
address: Address::Tcp(path),
name,
reference: Some(pos),
..
}) = source
{
assert_eq!(path, AddressPath::Short("0.0.0.0:4003".to_string()));
assert_eq!(name, None);
assert_eq!(pos.latitude, 43.628101);
assert_eq!(pos.longitude, 1.367263);
}
let source = Source::from_str("ws://1.2.3.4:4003/get?LFBO");
assert!(source.is_ok());
if let Ok(Source {
address,
name,
reference: Some(pos),
..
}) = source
{
assert_eq!(
address,
Address::Websocket(WebsocketPath::Short(
"ws://1.2.3.4:4003/get".to_string()
))
);
assert_eq!(name, None);
assert_eq!(pos.latitude, 43.628101);
assert_eq!(pos.longitude, 1.367263);
}
}
}