use std::sync::{
Arc,
atomic::{AtomicBool, Ordering::SeqCst},
};
use puffin::{FrameData, FrameView};
use crate::PROTOCOL_VERSION;
pub struct Client {
addr: String,
connected: Arc<AtomicBool>,
alive: Arc<AtomicBool>,
frame_view: Arc<parking_lot::Mutex<FrameView>>,
}
impl Drop for Client {
fn drop(&mut self) {
self.alive.store(false, SeqCst);
}
}
impl Client {
pub fn new(addr: String) -> Self {
let alive = Arc::new(AtomicBool::new(true));
let connected = Arc::new(AtomicBool::new(false));
let frame_view = Arc::new(parking_lot::Mutex::new(FrameView::default()));
let client = Self {
addr: addr.clone(),
connected: Arc::clone(&connected),
alive: Arc::clone(&alive),
frame_view: Arc::clone(&frame_view),
};
let _: std::thread::JoinHandle<()> = std::thread::Builder::new()
.name("http_client_thread".to_owned())
.spawn(move || {
log::info!("Connecting to {addr}…");
while alive.load(SeqCst) {
match std::net::TcpStream::connect(&addr) {
Ok(mut stream) => {
*frame_view.lock() = FrameView::default();
log::info!("Connected to {addr}");
connected.store(true, SeqCst);
while alive.load(SeqCst) {
match consume_message(&mut stream) {
Ok(frame_data) => {
frame_view
.lock()
.add_frame(std::sync::Arc::new(frame_data));
}
Err(err) => {
log::warn!(
"Connection to puffin server closed: {}",
error_display_chain(err.as_ref())
);
connected.store(false, SeqCst);
break;
}
}
}
}
Err(err) => {
log::debug!("Failed to connect to {addr}: {err}");
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
}
})
.expect("Failed to spawn client thread");
client
}
pub fn addr(&self) -> &str {
&self.addr
}
pub fn connected(&self) -> bool {
self.connected.load(SeqCst)
}
pub fn frame_view(&self) -> parking_lot::MutexGuard<'_, FrameView> {
self.frame_view.lock()
}
}
pub fn consume_message(stream: &mut impl std::io::Read) -> anyhow::Result<puffin::FrameData> {
let mut server_version = [0_u8; 2];
stream.read_exact(&mut server_version)?;
let server_version = u16::from_le_bytes(server_version);
match server_version.cmp(&PROTOCOL_VERSION) {
std::cmp::Ordering::Less => {
anyhow::bail!(
"puffin server is using an older protocol version ({server_version}) than the client ({PROTOCOL_VERSION})."
);
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {
anyhow::bail!(
"puffin server is using a newer protocol version ({server_version}) than the client ({PROTOCOL_VERSION}). Update puffin_viewer with 'cargo install puffin_viewer --locked'."
);
}
}
use anyhow::Context as _;
FrameData::read_next(stream)
.context("Failed to parse FrameData")?
.ok_or_else(|| anyhow::format_err!("End of stream"))
}
fn error_display_chain(error: &dyn std::error::Error) -> String {
let mut s = error.to_string();
if let Some(source) = error.source() {
s.push_str(" -> ");
s.push_str(&error_display_chain(source));
}
s
}