use std::net::ToSocketAddrs;
use std::net::{Ipv4Addr, SocketAddr};
use anyhow::{Error, Result};
use async_trait::async_trait;
use tokio::net::{TcpListener, TcpStream};
use crate::connector::Connector;
pub struct Config {
listen_addr: SocketAddr,
}
const DEFAULT_TCP_PORT: u16 = 9045;
impl Default for Config {
fn default() -> Self {
Config {
listen_addr: SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
}
}
}
pub(crate) struct TcpConnector {
listener: TcpListener,
}
impl TryFrom<Config> for TcpConnector {
type Error = Error;
fn try_from(config: Config) -> Result<Self, Self::Error> {
let listener = std::net::TcpListener::bind(config.listen_addr)?;
Ok(TcpConnector {
listener: TcpListener::from_std(listener)?,
})
}
}
#[async_trait]
impl Connector for TcpConnector {
async fn new_outgoing_connection(
&mut self,
locator: &crate::locator::Peer,
) -> Result<crate::connection::Connection> {
if !locator.transport.eq("tcp") {
return Err(Error::msg("not a tcp address"));
}
let port: u16 = match locator.hints.get("port") {
Some(port_as_str) => port_as_str.parse()?,
None => DEFAULT_TCP_PORT,
};
let addr = (locator.designator.as_str(), port)
.to_socket_addrs()?
.next()
.ok_or_else(|| Error::msg("could not resolve designator host"))?;
let stream = TcpStream::connect(addr).await?;
Ok(stream.into())
}
async fn accept_incoming_connection(
&mut self,
) -> Result<Option<crate::connection::Connection>> {
let (stream, _) = self.listener.accept().await?;
Ok(Some(stream.into()))
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, time::Duration};
use ocapn_syrup::Value;
use tokio::{join, select, spawn, time::interval};
use crate::{connector::Connector, locator::Peer, Error, Result};
use super::{Config, TcpConnector};
struct PingPongAgent {
connector: TcpConnector,
}
impl PingPongAgent {
fn new() -> Self {
let c = Config::default();
PingPongAgent {
connector: c.try_into().expect("create connector"),
}
}
fn whoami(&self) -> Peer {
let local_addr = self.connector.listener.local_addr().expect("local address");
Peer {
designator: local_addr.ip().to_string(),
transport: "tcp".to_owned(),
hints: HashMap::from([("port".to_owned(), local_addr.port().to_string())]),
}
}
async fn run_client(&mut self, peer: Peer) -> Result<()> {
let mut outbound = self
.connector
.new_outgoing_connection(&peer)
.await
.expect("connect to peer");
let mut pongs_rcvd = 0u32;
for _ in 0..10 {
outbound.send(Value::String("ping".to_owned())).await?;
}
let mut timeout = interval(Duration::from_secs(5));
timeout.tick().await;
loop {
select! {
res = outbound.recv() => {
let value = res?;
assert_eq!(value, Value::string("pong"));
pongs_rcvd+=1;
if pongs_rcvd >= 10 {
break;
}
}
_ = timeout.tick() => {
println!("client giving up");
return Err(Error::msg("timeout waiting for pings and pongs"));
}
}
}
if pongs_rcvd >= 10 {
outbound.close().await?;
Ok(())
} else {
Err(Error::msg("missing some pings and/or pongs hmm"))
}
}
async fn run_server(&mut self) -> Result<()> {
let mut timeout = interval(Duration::from_secs(5));
timeout.tick().await;
loop {
select! {
res = self.connector.accept_incoming_connection() => {
let mut conn = res?.ok_or(Error::msg("connector shut down"))?;
for _ in 0..10 {
let ping = conn.recv().await?;
assert_eq!(ping, Value::string("ping"));
conn.send(Value::string("pong")).await?;
}
return Ok(());
}
_ = timeout.tick() => {
println!("server giving up");
return Err(Error::msg("timeout waiting for pings and pongs"));
}
}
}
}
}
#[tokio::test]
async fn client_server_oneway() {
let mut alice = PingPongAgent::new();
let alice_locator = alice.whoami();
let mut bob = PingPongAgent::new();
let alice_task = spawn(async move { alice.run_server().await });
let bob_task = spawn(async move { bob.run_client(alice_locator).await });
let (alice_result, bob_result) = join!(alice_task, bob_task);
alice_result.expect("alice run").expect("alice run ok");
bob_result.expect("bob run").expect("bob run ok");
}
}