use std::io::{self, Read, Write};
use std::mem;
use std::net::*;
use std::time::Duration;
use crate::rdma::{mr::*, qp::*};
fn stream_write(stream: &mut &TcpStream, buf: &[u8]) -> io::Result<()> {
stream.write_all(&buf.len().to_le_bytes())?;
let mut written = 0;
while written < buf.len() {
let len = stream.write(&buf[written..])?;
written += len;
}
Ok(())
}
fn stream_read(stream: &mut &TcpStream) -> io::Result<Vec<u8>> {
let mut buf = [0; mem::size_of::<usize>()];
stream.read_exact(&mut buf)?;
let len = usize::from_le_bytes(buf);
let mut buf = vec![0; len];
stream.read_exact(&mut buf)?;
Ok(buf)
}
fn connect_until_success(
server_addr: SocketAddrV4,
wait_on_failure: Duration,
) -> io::Result<TcpStream> {
loop {
let stream = TcpStream::connect(server_addr);
if stream.is_ok() {
break stream;
}
std::thread::sleep(wait_on_failure);
}
}
pub struct Connecter {
with: Option<Ipv4Addr>,
stream: Option<TcpStream>,
}
impl Connecter {
pub fn connect_local(first: &mut Qp, second: &mut Qp) -> io::Result<()> {
let ep_first = first.endpoint().unwrap();
let ep_second = second.endpoint().unwrap();
first
.bind_peer(ep_second)
.and_then(|_| second.bind_peer(ep_first))
}
pub const DEFAULT_PORT: u16 = 13337;
pub fn new_on_port(with: Option<Ipv4Addr>, port: u16) -> io::Result<Self> {
let stream = if let Some(addr) = with.as_ref() {
let server_addr = SocketAddrV4::new(*addr, port);
connect_until_success(server_addr, Duration::from_millis(200))?
} else {
let inaddr_any = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
let listener = TcpListener::bind(inaddr_any)?;
listener.accept()?.0
};
Ok(Self {
with,
stream: Some(stream),
})
}
pub fn new(with: Option<Ipv4Addr>) -> io::Result<Self> {
Self::new_on_port(with, Self::DEFAULT_PORT)
}
pub fn connect(&self, qp: &mut Qp) -> io::Result<Option<QpPeer>> {
let ep = qp.endpoint();
let ep = serde_json::to_string(&ep)?;
let mut stream = self.stream.as_ref().unwrap();
let ep = if self.with.is_some() {
let buf = stream_read(&mut stream)?;
let peer = serde_json::from_slice::<QpEndpoint>(buf.as_slice())?;
stream_write(&mut stream, ep.as_bytes())?;
peer
} else {
stream_write(&mut stream, ep.as_bytes())?;
let buf = stream_read(&mut stream)?;
serde_json::from_slice::<QpEndpoint>(buf.as_slice())?
};
if qp.qp_type() == QpType::Rc {
qp.bind_peer(ep)?;
Ok(None)
} else {
let sgid_index = if qp.use_global_routing() {
qp.port().unwrap().1
} else {
0
};
QpPeer::new(qp.pd(), sgid_index, ep).map(Some)
}
}
pub fn send_mr(&self, slice: MrRemote) -> io::Result<()> {
let mr = serde_json::to_string(&slice)?;
let mut stream = self.stream.as_ref().unwrap();
stream_write(&mut stream, mr.as_bytes())?;
Ok(())
}
pub fn recv_mr(&self) -> io::Result<MrRemote> {
let mut stream = self.stream.as_ref().unwrap();
let buf = stream_read(&mut stream)?;
let mr = serde_json::from_slice::<MrRemote>(buf.as_slice())?;
Ok(mr)
}
}