use std::error;
use futures_util::{StreamExt, SinkExt};
use tokio_tungstenite::connect_async;
use std::sync::mpsc::{channel, Receiver};
#[macro_use] extern crate serde_derive;
fn default_timestamp() -> f32 {
0.0
}
fn default_unknown_string() -> String {
"unknown".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RisResponseData {
#[serde(default="default_timestamp")]
timestamp: f32,
#[serde(default="default_unknown_string")]
peer: String,
#[serde(default="default_unknown_string")]
peer_asn: String,
#[serde(default="default_unknown_string")]
id: String,
#[serde(default="default_unknown_string")]
host: String,
#[serde(rename = "type")]
#[serde(default="default_unknown_string")]
data_type: String
}
impl Default for RisResponseData {
fn default() -> RisResponseData{
RisResponseData {
timestamp: default_timestamp(),
peer: default_unknown_string(),
peer_asn: default_unknown_string(),
id: default_unknown_string(),
host: default_unknown_string(),
data_type: default_unknown_string(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RisRequestData {
host: Option<String>,
#[serde(rename = "type")]
data_type: Option<String>,
require: Option<String>,
path: Option<Vec<u32>>
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RisResponse {
#[serde(default="default_unknown_string")]
#[serde(rename = "type")]
message_type: String,
data: RisResponseData,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RisRequest {
#[serde(rename = "type")]
message_type: String,
data: Option<RisRequestData>,
}
pub struct RisClient {
host: String,
client_id: String,
}
impl RisClient {
pub fn new(host: String, client_id: String) -> Result<RisClient, Box<dyn error::Error>> {
Ok(RisClient {
host,
client_id,
})
}
pub fn default() -> Result<RisClient, Box<dyn error::Error>> {
Ok(RisClient {
host: "ris-live.ripe.net".to_string(),
client_id: "rust-risclient".to_string(),
})
}
pub async fn stream_custom(&mut self, host: Option<String>, data_type: Option<String>, require: Option<String>, path: Option<Vec<u32>>) -> Result<Receiver<RisResponse>, Box<dyn error::Error>> {
let url = format!("wss://{}/v1/ws/?client={}", self.host, self.client_id);
let handle = connect_async(url).await;
match handle {
Ok(handle) => {
let request = RisRequest {
message_type: "ris_subscribe".to_string(),
data: Some(RisRequestData {
host,
data_type,
require,
path,
})
};
let (mut tx, _) = handle;
let message = match serde_json::to_string(&request) {
Ok(message) => message,
Err(e) => return Err(Box::new(e))
};
match tx.send(message.into()).await {
Ok(_) => {
let (ctx, crx) = channel();
let _result = tokio::spawn(async move {
while let Some(msg)= tx.next().await {
match msg {
Ok(msg) => {
let message = msg.to_string();
let data: RisResponse = match serde_json::from_str(&message) {
Ok(data) => data,
Err(ref e) if e.is_eof() => continue,
Err(e) => panic!("failed decoding message: {:?}, '{}'", e, message),
};
match ctx.send(data) {
Ok(_) => continue,
Err(e) => panic!("failed to send decoded message to channel: {:?}", e)
}
},
Err(e) => panic!("failed to decode message: {:?}", e),
}
}
});
Ok(crx)
},
Err(e) => Err(Box::new(e))
}
},
Err(e) => Err(Box::new(e))
}
}
pub async fn stream(&mut self) -> Result<Receiver<RisResponse>, Box<dyn error::Error>> {
self.stream_custom(None, None, None, None).await
}
}