mod client;
mod endpoint;
mod routingtable;
mod tcp_pool;
use anyhow::Context;
use dashmap::DashMap;
use derivative::*;
pub use endpoint::*;
use routingtable::*;
use serde::{de::DeserializeOwned, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use tap::TapFallible;
mod reqs;
use async_net::{TcpListener, TcpStream};
mod common;
pub use client::request;
pub use common::*;
use parking_lot::RwLock;
use rand::prelude::*;
use rand::seq::SliceRandom;
use rand::thread_rng;
use reqs::*;
use smol::prelude::*;
use smol::Timer;
use smol_timeout::TimeoutExt;
use std::time::Duration;
#[derive(Derivative, Clone, Default)]
#[derivative(Debug)]
pub struct NetState {
network_name: String,
routes: Arc<RwLock<RoutingTable>>,
#[derivative(Debug = "ignore")]
verbs: Arc<DashMap<String, BoxedResponder>>,
}
impl NetState {
pub async fn run_server(&self, listener: TcpListener) {
let mut this = self.clone();
this.setup_routing();
let spammer = self.new_addr_spam().race(self.get_routes_spam());
spammer
.race(async move {
loop {
let (conn, addr) = listener.accept().await.unwrap();
let self_copy = self.clone();
smolscale::spawn(async move {
if let Err(e) = self_copy.server_handle(conn).await {
log::debug!("{} terminating on error: {:?}", addr, e)
}
})
.detach();
}
})
.await
}
async fn new_addr_spam(&self) {
let mut rng = rand::rngs::OsRng {};
let mut tmr = Timer::interval(Duration::from_secs(30));
loop {
tmr.next().await;
let routes = self.routes.read().to_vec();
if !routes.is_empty() {
let (rand_neigh, _) = routes[rng.gen::<usize>() % routes.len()];
let (rand_route, _) = routes[rng.gen::<usize>() % routes.len()];
let network_name = self.network_name.clone();
smolscale::spawn(async move {
let _ = crate::request::<RoutingRequest, String>(
rand_neigh,
&network_name,
"new_addr",
RoutingRequest {
proto: String::from("tcp"),
addr: rand_route.to_string(),
},
)
.await
.tap_err(|err| log::debug!("addrspam failed to {} ({:?})", rand_neigh, err));
})
.detach();
}
}
}
async fn get_routes_spam(&self) {
let mut tmr = Timer::interval(Duration::from_secs(10));
loop {
if let Some(route) = self.routes().get(0).copied() {
let network_name = self.network_name.clone();
let state = self.clone();
smolscale::spawn(async move {
let resp: Vec<SocketAddr> = crate::request::<(), Vec<SocketAddr>>(
route,
&network_name,
"get_routes",
(),
)
.timeout(Duration::from_secs(10))
.await
.context("timeout")
.tap_err(|err| {
log::debug!("could not get routes from {}: {:?}", route, err)
})??;
log::debug!("{} routes from {}", resp.len(), route);
for new_route in resp {
log::debug!("testing {}", new_route);
let state = state.clone();
let network_name = network_name.clone();
smolscale::spawn(async move {
crate::request::<_, u64>(new_route, &network_name, "ping", 10)
.timeout(Duration::from_secs(10))
.await
.context("timeout")
.tap_err(|err| {
log::warn!(
"route {} from {} was unpingable ({:?})!",
new_route,
route,
err
)
})??;
state.add_route(new_route);
Ok::<_, anyhow::Error>(())
})
.detach();
}
Ok::<_, anyhow::Error>(())
})
.detach();
}
tmr.next().await;
}
}
async fn server_handle(&self, mut conn: TcpStream) -> anyhow::Result<()> {
conn.set_nodelay(true)?;
loop {
match self
.server_handle_one(&mut conn)
.timeout(Duration::from_secs(60))
.await
{
Some(Err(err)) => {
log::debug!(
"connection from {:?} died in error {:?}",
conn.peer_addr(),
err
);
return Err(err);
}
Some(Ok(_)) => {}
None => anyhow::bail!("timeout"),
}
}
}
async fn server_handle_one(&self, conn: &mut TcpStream) -> anyhow::Result<()> {
let cmd: RawRequest = stdcode::deserialize(&read_len_bts(conn.clone()).await?)?;
if cmd.proto_ver != 1 {
let err = stdcode::serialize(&RawResponse {
kind: "Err".to_owned(),
body: stdcode::serialize(&"bad protocol version").unwrap(),
})
.unwrap();
write_len_bts(conn, &err).await?;
return Err(anyhow::anyhow!("bad"));
}
if cmd.netname != self.network_name {
return Err(anyhow::anyhow!("bad"));
}
log::trace!("got command {:?} from {:?}", cmd.verb, conn.peer_addr());
let response_fut = {
let responder = self.verbs.get(&cmd.verb);
if let Some(responder) = responder {
let res = responder.0(&cmd.payload);
Some(res)
} else {
None
}
};
let response: Result<Vec<u8>> = if let Some(fut) = response_fut {
fut.await
} else {
Err(MelnetError::VerbNotFound)
};
match response {
Ok(resp) => {
write_len_bts(
conn,
&stdcode::serialize(&RawResponse {
kind: "Ok".into(),
body: resp,
})
.unwrap(),
)
.await?
}
Err(MelnetError::Custom(string)) => {
write_len_bts(
conn,
&stdcode::serialize(&RawResponse {
kind: "Err".into(),
body: string.as_bytes().into(),
})
.unwrap(),
)
.await?
}
Err(MelnetError::VerbNotFound) => {
write_len_bts(
conn,
&stdcode::serialize(&RawResponse {
kind: "NoVerb".into(),
body: b"".to_vec(),
})
.unwrap(),
)
.await?
}
err => {
log::error!(
"bad error created by responder at verb {}: {:?}",
cmd.verb,
err
);
anyhow::bail!("wtf")
}
}
Ok(())
}
fn setup_routing(&mut self) {
self.listen("ping", |ping: Request<u64>| async move {
let body = ping.body;
Ok(body)
});
self.listen("new_addr", |request: Request<RoutingRequest>| async move {
let rr = request.body.clone();
let state = request.state.clone();
if rr.proto != "tcp" {
log::debug!("new_addr saw unrecognizable protocol = {:?}", rr.proto);
anyhow::bail!("bad protocol")
}
let their_addr = *smol::net::resolve(&rr.addr)
.await
.context("cannot resolve address given")?
.first()
.context("zero addresses in the hostname given")?;
let resp: u64 =
crate::request(their_addr, &state.network_name.to_owned(), "ping", 814u64)
.await
.tap_err(|err| log::warn!("error while pinging {}: {:?}", their_addr, err))
.context("remote was unpingable")?;
if resp != 814 {
log::debug!("new_addr bad ping {:?} {:?}", rr.addr, resp);
anyhow::bail!("remote responded to ping corruptly")
} else {
let prev_routes = state.routes().len();
state.add_route(their_addr);
let new_routes = state.routes().len();
if new_routes > prev_routes {
log::debug!("received route {}; now {} routes", their_addr, new_routes);
}
Ok::<String, anyhow::Error>("".into())
}
});
self.listen("get_routes", |request: Request<()>| async move {
Ok(request.state.routes())
})
}
pub fn listen<
Req: DeserializeOwned + Send + 'static,
Resp: Serialize + Send + 'static,
T: Endpoint<Req, Resp> + Send + 'static,
>(
&self,
verb: &str,
responder: T,
) {
let responder = responder_to_closure(self.clone(), responder);
self.verbs.insert(verb.into(), responder);
}
pub fn add_route(&self, addr: SocketAddr) {
self.routes.write().add_route(addr)
}
pub fn routes(&self) -> Vec<SocketAddr> {
let mut rr: Vec<SocketAddr> = self.routes.read().to_vec().iter().map(|v| v.0).collect();
rr.shuffle(&mut thread_rng());
rr
}
fn set_name(&mut self, name: &str) {
self.network_name = name.to_string()
}
pub fn new_with_name(name: &str) -> Self {
let mut ns = NetState::default();
ns.set_name(name);
ns
}
}