use std::io::Read;
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use super::snapshot::SidebarSnapshot;
type Latest = Arc<Mutex<Option<SidebarSnapshot>>>;
pub struct SnapshotHandle {
latest: Latest,
}
impl SnapshotHandle {
pub fn take(&self) -> Option<SidebarSnapshot> {
self.latest.lock().unwrap().take()
}
}
pub fn connect(socket_path: &Path, wake_tx: mpsc::SyncSender<()>) -> SnapshotHandle {
let latest: Latest = Arc::new(Mutex::new(None));
let latest_clone = latest.clone();
let path = socket_path.to_path_buf();
thread::spawn(move || {
connection_loop(&path, &latest_clone, &wake_tx);
});
SnapshotHandle { latest }
}
fn connection_loop(path: &Path, latest: &Latest, wake_tx: &mpsc::SyncSender<()>) {
let min_backoff = Duration::from_millis(50);
let max_backoff = Duration::from_secs(2);
let jitter = Duration::from_millis((std::process::id() % 100) as u64);
let mut backoff = min_backoff;
loop {
let connected_at = Instant::now();
if let Ok(stream) = UnixStream::connect(path) {
backoff = min_backoff;
if read_loop(stream, latest, wake_tx).is_err() {
break; }
if connected_at.elapsed() <= Duration::from_secs(5) {
backoff = (backoff * 2).min(max_backoff);
}
}
thread::sleep(backoff + jitter);
}
}
fn read_loop(
mut stream: UnixStream,
latest: &Latest,
wake_tx: &mpsc::SyncSender<()>,
) -> Result<(), mpsc::SendError<()>> {
const MAX_PAYLOAD: usize = 1024 * 1024; loop {
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).is_err() {
return Ok(()); }
let len = u32::from_be_bytes(len_buf) as usize;
if len > MAX_PAYLOAD {
return Ok(()); }
let mut buf = vec![0u8; len];
if stream.read_exact(&mut buf).is_err() {
return Ok(());
}
if let Ok(snapshot) = serde_json::from_slice::<SidebarSnapshot>(&buf) {
*latest.lock().unwrap() = Some(snapshot);
if let Err(mpsc::TrySendError::Disconnected(())) = wake_tx.try_send(()) {
return Err(mpsc::SendError(()));
}
}
}
}