use std::{
io::Write,
net::{SocketAddr, TcpStream},
time::{Duration, Instant},
};
#[derive(thiserror::Error, Debug)]
pub enum ClientError {
#[error("Failed to connect to Rerun server at {addr:?}: {err}")]
Connect {
addr: SocketAddr,
err: std::io::Error,
},
#[error("Failed to send to Rerun server at {addr:?}: {err}")]
Send {
addr: SocketAddr,
err: std::io::Error,
},
}
enum TcpStreamState {
Pending {
start_time: Instant,
num_attempts: usize,
},
Connected(TcpStream),
}
impl TcpStreamState {
fn reset() -> Self {
Self::Pending {
start_time: Instant::now(),
num_attempts: 0,
}
}
}
pub struct TcpClient {
addr: SocketAddr,
stream_state: TcpStreamState,
flush_timeout: Option<Duration>,
}
impl TcpClient {
pub fn new(addr: SocketAddr, flush_timeout: Option<Duration>) -> Self {
Self {
addr,
stream_state: TcpStreamState::reset(),
flush_timeout,
}
}
pub fn connect(&mut self) -> Result<(), ClientError> {
match self.stream_state {
TcpStreamState::Connected(_) => Ok(()),
TcpStreamState::Pending {
start_time,
num_attempts,
} => {
re_log::debug!("Connecting to {:?}…", self.addr);
let timeout = std::time::Duration::from_secs(5);
match TcpStream::connect_timeout(&self.addr, timeout) {
Ok(mut stream) => {
re_log::debug!("Connected to {:?}.", self.addr);
if let Err(err) = stream
.write(&crate::PROTOCOL_VERSION_1.to_le_bytes())
.and_then(|_| stream.write(crate::PROTOCOL_HEADER.as_bytes()))
{
self.stream_state = TcpStreamState::Pending {
start_time,
num_attempts: num_attempts + 1,
};
Err(ClientError::Send {
addr: self.addr,
err,
})
} else {
self.stream_state = TcpStreamState::Connected(stream);
Ok(())
}
}
Err(err) => {
self.stream_state = TcpStreamState::Pending {
start_time,
num_attempts: num_attempts + 1,
};
Err(ClientError::Connect {
addr: self.addr,
err,
})
}
}
}
}
}
pub fn send(&mut self, packet: &[u8]) -> Result<(), ClientError> {
use std::io::Write as _;
self.connect()?;
if let TcpStreamState::Connected(stream) = &mut self.stream_state {
re_log::trace!("Sending a packet of size {}…", packet.len());
if let Err(err) = stream.write(&(packet.len() as u32).to_le_bytes()) {
self.stream_state = TcpStreamState::reset();
return Err(ClientError::Send {
addr: self.addr,
err,
});
}
if let Err(err) = stream.write(packet) {
self.stream_state = TcpStreamState::reset();
return Err(ClientError::Send {
addr: self.addr,
err,
});
}
Ok(())
} else {
unreachable!("self.connect should have ensured this");
}
}
pub fn flush(&mut self) {
re_log::trace!("Attempting to flush TCP stream…");
match &mut self.stream_state {
TcpStreamState::Pending { .. } => {
re_log::warn_once!(
"Tried to flush while TCP stream was still Pending. Data was possibly dropped."
);
}
TcpStreamState::Connected(stream) => {
if let Err(err) = stream.flush() {
re_log::warn!("Failed to flush TCP stream: {err}");
self.stream_state = TcpStreamState::reset();
} else {
re_log::trace!("TCP stream flushed.");
}
}
}
}
pub fn has_timed_out_for_flush(&self) -> bool {
match self.stream_state {
TcpStreamState::Pending {
start_time,
num_attempts,
} => {
self.flush_timeout.is_some_and(|timeout| {
Instant::now().duration_since(start_time) > timeout && num_attempts > 0
})
}
TcpStreamState::Connected(_) => false,
}
}
}