#[macro_use]
extern crate tracing;
use async_std::{
channel::{unbounded, Receiver, Sender},
net::TcpStream,
task,
};
pub use types::{api::Receive_Type, Error, Identity, Message, Result, TimePair};
use types::{
api::{
self, ApiMessageEnum,
Peers_Type::{DISCOVER, RESP},
Setup_Type::ACK,
},
encode_message, parse_message, read_with_length, write_with_length,
};
#[derive(Clone)]
pub struct RatmanIpc {
socket: TcpStream,
addr: Identity,
recv: Receiver<(Receive_Type, Message)>,
disc: Receiver<Identity>,
}
impl RatmanIpc {
pub async fn default() -> Result<Self> {
Self::connect("127.0.0.1:9020", None).await
}
pub async fn default_with_addr(addr: Identity) -> Result<Self> {
Self::connect("127.0.0.1:9020", Some(addr)).await
}
pub async fn connect(socket_addr: &str, addr: Option<Identity>) -> Result<RatmanIpc> {
let mut socket = TcpStream::connect(socket_addr).await?;
let online_msg = api::api_setup(match addr {
Some(addr) => api::online(addr, vec![]),
None => api::online_init(),
});
info!("Sending introduction message!");
write_with_length(&mut socket, &encode_message(online_msg)?).await?;
trace!("Waiting for ACK message!");
let addr = match parse_message(&mut socket).await.map(|m| m.inner) {
Ok(Some(one_of)) => match one_of {
ApiMessageEnum::setup(s) if s.field_type == ACK => s
._id
.as_ref()
.map(|_| Identity::from_bytes(s.get_id()))
.or(addr)
.expect("failed to initialise new address!"),
_ => unreachable!(),
},
_ => unreachable!(),
};
debug!("IPC client initialisation done!");
let (tx, recv) = unbounded();
let (dtx, disc) = unbounded();
task::spawn(run_receive(socket.clone(), tx, dtx));
Ok(Self {
socket,
addr,
recv,
disc,
})
}
pub async fn anonymous(socket_addr: &str) -> Result<Self> {
let mut socket = TcpStream::connect(socket_addr).await?;
let introduction = api::api_setup(api::anonymous());
write_with_length(&mut socket, &encode_message(introduction)?).await?;
let addr = Identity::random(); let (_, recv) = unbounded(); let (_, disc) = unbounded(); Ok(Self {
socket,
addr,
recv,
disc,
})
}
pub fn address(&self) -> Identity {
self.addr
}
pub async fn send_to(&self, recipient: Identity, payload: Vec<u8>) -> Result<()> {
let msg = api::api_send(api::send_default(
Message::new(
self.addr,
vec![recipient], payload,
vec![], )
.into(),
));
write_with_length(&mut self.socket.clone(), &encode_message(msg)?).await?;
Ok(())
}
pub async fn flood(&self, payload: Vec<u8>) -> Result<()> {
let msg = api::api_send(api::send_flood(
Message::new(
self.addr,
vec![], payload,
vec![], )
.into(),
));
write_with_length(&mut self.socket.clone(), &encode_message(msg)?).await?;
Ok(())
}
pub async fn next(&self) -> Option<(Receive_Type, Message)> {
self.recv.recv().await.ok()
}
pub async fn discover(&self) -> Option<Identity> {
self.disc.recv().await.ok()
}
pub async fn get_peers(&self) -> Result<Vec<Identity>> {
let msg = api::api_peers(api::peers_req());
write_with_length(&mut self.socket.clone(), &encode_message(msg)?).await?;
match parse_message(&mut self.socket.clone())
.await
.map(|m| m.inner)
{
Ok(Some(one_of)) => match one_of {
ApiMessageEnum::peers(s) if s.field_type == RESP => {
Ok(s.peers.iter().map(|p| Identity::from_bytes(p)).collect())
}
_ => unreachable!(),
},
_ => unreachable!(),
}
}
}
async fn run_receive(
mut socket: TcpStream,
tx: Sender<(Receive_Type, Message)>,
dtx: Sender<Identity>,
) {
loop {
trace!("Reading message from stream...");
let msg = match read_with_length(&mut socket).await {
Ok(msg) => msg,
Err(e) => {
error!("Failed to read from socket: {:?}", e);
break;
}
};
trace!("Parsing message from stream...");
match types::decode_message(&msg).map(|m| m.inner) {
Ok(Some(one_of)) => match one_of {
ApiMessageEnum::recv(mut msg) => {
let tt = msg.field_type;
let msg = msg.take_msg();
debug!("Forwarding message to IPC wrapper");
if let Err(e) = tx.send((tt, msg.into())).await {
error!("Failed to forward received message: {}", e);
}
}
ApiMessageEnum::peers(peers) if peers.get_field_type() == DISCOVER => {
match peers.peers.get(0) {
Some(p) => match dtx.send(Identity::from_bytes(p)).await {
Ok(_) => {}
_ => {
error!("Failed to send discovery to client poller...");
continue;
}
},
None => continue,
}
}
_ => {} },
_ => {
warn!("Invalid payload received; skipping...");
continue;
}
}
}
}
#[async_std::test]
#[ignore]
async fn send_message() {
pub fn setup_logging() {
use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter};
let filter = EnvFilter::default()
.add_directive(LevelFilter::TRACE.into())
.add_directive("async_std=error".parse().unwrap())
.add_directive("async_io=error".parse().unwrap())
.add_directive("polling=error".parse().unwrap())
.add_directive("mio=error".parse().unwrap());
fmt().with_env_filter(filter).init();
}
setup_logging();
use async_std::task::sleep;
use std::{process::Command, time::Duration};
let mut daemon = Command::new("cargo")
.current_dir("../..")
.args(&[
"run",
"--bin",
"ratmand",
"--features",
"daemon",
"--",
"--no-inet",
"--accept-unknown-peers",
])
.spawn()
.unwrap();
sleep(Duration::from_secs(1)).await;
let client = RatmanIpc::default().await.unwrap();
let msg = vec![1, 3, 1, 2];
info!("Sending message: {:?}", msg);
client.send_to(client.address(), msg).await.unwrap();
let (_, recv) = client.next().await.unwrap();
info!("Receiving message: {:?}", recv);
assert_eq!(recv.get_payload(), &[1, 3, 1, 2]);
daemon.kill().unwrap();
}