use std::{fmt, time::Duration};
#[macro_use]
extern crate serde_derive;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgencyInfo {
pub onetrip: String,
pub realtime_vehicle_positions: String,
pub realtime_trip_updates: String,
pub realtime_alerts: String,
pub has_auth: bool,
pub auth_type: String,
pub auth_header: String,
pub auth_password: String,
pub fetch_interval: f32,
pub multiauth: Option<Vec<String>>,
}
impl PartialEq for AgencyInfo {
fn eq(&self, other: &Self) -> bool {
self.onetrip == other.onetrip
}
}
#[derive(Debug, Deserialize, Serialize)]
pub enum FeedType {
Trips,
Vehicles,
Alerts,
Shapes,
}
impl fmt::Display for FeedType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let display_str = match self {
FeedType::Trips => "trips",
FeedType::Vehicles => "vehicles",
FeedType::Alerts => "alerts",
FeedType::Shapes => "shapes",
};
write!(f, "{}", display_str)
}
}
#[derive(Debug)]
pub struct Agencyurls {
pub vehicles: Option<String>,
pub trips: Option<String>,
pub alerts: Option<String>,
}
#[tarpc::service]
pub trait IngestInfo {
async fn agencies() -> String;
async fn addagency(agency: AgencyInfo) -> String;
async fn removeagency(agency: String) -> String;
async fn getagency(agency: String, feedtype: FeedType) -> Vec<u8>;
}
pub async fn fetchurl(
url: &Option<String>,
auth_header: &String,
auth_type: &String,
auth_password: &String,
client: &reqwest::Client,
timeoutforfetch: u64,
) -> Option<Vec<u8>> {
if url.is_none() || url.to_owned().unwrap().contains("kactus") {
return None;
}
let mut req = client.get(url.to_owned().unwrap());
if auth_type == "header" {
req = req.header(auth_header, auth_password);
}
let resp = req
.timeout(Duration::from_millis(timeoutforfetch))
.send()
.await;
match resp {
Ok(resp) => {
if resp.status().is_success() {
match resp.bytes().await {
Ok(bytes_pre) => {
let bytes = bytes_pre.to_vec();
Some(bytes)
}
_ => None,
}
} else {
println!("{}:{:?}", &url.clone().unwrap(), resp.status());
None
}
}
Err(e) => {
println!("error fetching url: {:?}", e);
None
}
}
}
pub fn make_url(
url: &String,
auth_type: &String,
auth_header: &String,
auth_password: &String,
) -> Option<String> {
if !url.is_empty() {
let mut outputurl = url.clone();
if !auth_password.is_empty() && auth_type == "query_param" {
outputurl = outputurl.replace("PASSWORD", &auth_password);
}
return Some(outputurl);
}
return None;
}
pub fn parse_protobuf_message(
bytes: &[u8],
) -> Result<gtfs_rt::FeedMessage, Box<dyn std::error::Error>> {
let x = prost::Message::decode(bytes);
if x.is_ok() {
return Ok(x.unwrap());
} else {
return Err(Box::new(x.unwrap_err()));
}
}
pub mod insert {
use prost::Message;
use redis::{Commands, Connection};
use std::{fs::File, io::{self, Write}, time::{SystemTime, UNIX_EPOCH}};
pub fn insert_gtfs_rt_bytes(
con: &mut Connection,
bytes: &Vec<u8>,
onetrip: &str,
category: &str,
) {
let now_millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string();
let key: String = format!("gtfsrt|{}|{}", &onetrip, &category);
let _: () = con.set(key.clone(), bytes).unwrap();
inserttimes(con, &onetrip, &category, &now_millis);
}
pub fn persist_gtfs_rt_bytes(
bytes: &Vec<u8>,
onetrip: &str,
category: &str,
) -> io::Result<()> {
let now_millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string();
let file_path = format!("./gtfs-rt/{}-{}-{}", onetrip, category, now_millis);
let mut file = File::create(file_path)?;
file.write_all(&bytes)
}
pub fn insert_gtfs_rt(
con: &mut Connection,
data: >fs_rt::FeedMessage,
onetrip: &str,
category: &str,
) {
let now_millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string();
let bytes: Vec<u8> = data.to_owned().encode_to_vec();
let _: () = con
.set(format!("gtfsrt|{}|{}", &onetrip, &category), bytes.to_vec())
.unwrap();
inserttimes(con, &onetrip, &category, &now_millis);
}
fn inserttimes(con: &mut Connection, onetrip: &str, category: &str, now_millis: &String) {
let _: () = con
.set(
format!("gtfsrttime|{}|{}", &onetrip, &category),
&now_millis,
)
.unwrap();
let _: () = con
.set(format!("gtfsrtexists|{}", &onetrip), &now_millis)
.unwrap();
}
}
pub mod aspen {
pub async fn send_to_aspen(
agency: &str,
vehicles_result: &Option<Vec<u8>>,
trips_result: &Option<Vec<u8>>,
alerts_result: &Option<Vec<u8>>,
vehicles_exist: bool,
trips_exist: bool,
alerts_exist: bool,
useexistingdata: bool,
) {
let generating_vehicles = [
"f-mta~nyc~rt~mnr",
"f-mta~nyc~rt~lirr",
"f-roamtransit~rt",
"f-bart~rt",
];
let vehicles_exist = generating_vehicles.contains(&agency) || vehicles_exist;
}
}