use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::workspace::{self, WorkspaceSnapshot};
pub(crate) const DEBOUNCE: Duration = Duration::from_millis(150);
pub(crate) const QUEUE_CAPACITY: usize = 4;
pub(crate) const SHUTDOWN_DEADLINE: Duration = Duration::from_secs(5);
pub(crate) const USER_SAVE_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) enum SnapshotJob {
Auto {
session_name: String,
snapshot: WorkspaceSnapshot,
},
UserSave {
path: PathBuf,
snapshot: WorkspaceSnapshot,
ack: mpsc::SyncSender<Result<(), String>>,
},
Shutdown,
}
pub(crate) struct SnapshotWorker {
tx: mpsc::SyncSender<SnapshotJob>,
handle: Option<JoinHandle<()>>,
}
impl SnapshotWorker {
pub(crate) fn spawn() -> Self {
let (tx, rx) = mpsc::sync_channel::<SnapshotJob>(QUEUE_CAPACITY);
let handle = std::thread::Builder::new()
.name("ezpn-snapshot".into())
.spawn(move || run(rx))
.expect("spawn ezpn-snapshot thread");
Self {
tx,
handle: Some(handle),
}
}
pub(crate) fn submit(&self, job: SnapshotJob) -> bool {
self.tx.try_send(job).is_ok()
}
pub(crate) fn submit_with_deadline(&self, mut job: SnapshotJob, deadline: Duration) -> bool {
use std::sync::mpsc::TrySendError;
let start = Instant::now();
loop {
match self.tx.try_send(job) {
Ok(()) => return true,
Err(TrySendError::Full(j)) => {
if start.elapsed() >= deadline {
return false;
}
job = j;
std::thread::sleep(Duration::from_millis(20));
}
Err(TrySendError::Disconnected(_)) => return false,
}
}
}
#[cfg(test)]
pub(crate) fn shutdown(mut self) {
drain_and_join(&self.tx, self.handle.take());
}
}
impl Drop for SnapshotWorker {
fn drop(&mut self) {
drain_and_join(&self.tx, self.handle.take());
}
}
fn drain_and_join(tx: &mpsc::SyncSender<SnapshotJob>, handle: Option<JoinHandle<()>>) {
let Some(h) = handle else { return };
let _ = tx.send(SnapshotJob::Shutdown);
let deadline = Instant::now() + SHUTDOWN_DEADLINE;
while !h.is_finished() && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(50));
}
if h.is_finished() {
let _ = h.join();
} else {
eprintln!("ezpn: snapshot worker did not drain within 5s; leaking handle");
std::mem::forget(h);
}
}
fn run(rx: mpsc::Receiver<SnapshotJob>) {
let mut pending: Option<(String, WorkspaceSnapshot, Instant)> = None;
loop {
let timeout = pending
.as_ref()
.map(|(_, _, t0)| (*t0 + DEBOUNCE).saturating_duration_since(Instant::now()))
.unwrap_or(Duration::from_secs(60 * 60));
match rx.recv_timeout(timeout) {
Ok(SnapshotJob::Auto {
session_name,
snapshot,
}) => {
pending = Some((session_name, snapshot, Instant::now()));
}
Ok(SnapshotJob::UserSave {
path,
snapshot,
ack,
}) => {
let result = write_user_snapshot(&path, &snapshot).map_err(|e| e.to_string());
let _ = ack.send(result);
}
Ok(SnapshotJob::Shutdown) => {
if let Some((session, snapshot, _)) = pending.take() {
workspace::auto_save(&session, &snapshot);
}
return;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
if let Some((session, snapshot, _)) = pending.take() {
workspace::auto_save(&session, &snapshot);
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => return,
}
}
}
fn write_user_snapshot(path: &Path, snapshot: &WorkspaceSnapshot) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)?;
}
}
let json = serde_json::to_vec_pretty(snapshot)?;
let pid = std::process::id();
let file_name = path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_else(|| "snapshot.json".to_string());
let tmp = path.with_file_name(format!("{file_name}.tmp.{pid}"));
if let Err(e) = std::fs::write(&tmp, &json) {
let _ = std::fs::remove_file(&tmp);
return Err(e.into());
}
if let Err(e) = std::fs::rename(&tmp, path) {
let _ = std::fs::remove_file(&tmp);
return Err(e.into());
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
static ENV_LOCK: Mutex<()> = Mutex::new(());
fn dummy_snapshot() -> WorkspaceSnapshot {
WorkspaceSnapshot {
version: workspace::SNAPSHOT_VERSION,
shell: "/bin/sh".to_string(),
border_style: crate::render::BorderStyle::Single,
show_status_bar: true,
show_tab_bar: true,
scrollback: 1000,
active_tab: 0,
tabs: vec![workspace::TabSnapshot {
name: "1".to_string(),
layout: crate::layout::Layout::from_grid(1, 1),
active_pane: 0,
zoomed_pane: None,
broadcast: false,
panes: vec![],
}],
}
}
#[test]
fn worker_writes_user_save_atomically() {
let dir = tempfile::tempdir().unwrap();
let dest = dir.path().join("save.json");
let worker = SnapshotWorker::spawn();
let (ack_tx, ack_rx) = mpsc::sync_channel(1);
let ok = worker.submit(SnapshotJob::UserSave {
path: dest.clone(),
snapshot: dummy_snapshot(),
ack: ack_tx,
});
assert!(ok);
let result = ack_rx.recv_timeout(Duration::from_secs(5)).unwrap();
assert!(result.is_ok(), "user save must succeed: {result:?}");
assert!(dest.exists(), "atomic rename must produce final file");
let leftovers: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.flatten()
.filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
.collect();
assert!(leftovers.is_empty(), "no .tmp.* files: {leftovers:?}");
worker.shutdown();
}
#[test]
fn worker_shutdown_drains_pending_auto() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let dir = tempfile::tempdir().unwrap();
unsafe {
std::env::set_var("XDG_DATA_HOME", dir.path());
}
let session = format!("worker-shutdown-{}", std::process::id());
let worker = SnapshotWorker::spawn();
worker.submit(SnapshotJob::Auto {
session_name: session.clone(),
snapshot: dummy_snapshot(),
});
worker.shutdown();
let expected = dir
.path()
.join("ezpn")
.join("sessions")
.join(format!("{session}.json"));
assert!(
expected.exists(),
"shutdown must drain pending Auto to {expected:?}"
);
unsafe {
std::env::remove_var("XDG_DATA_HOME");
}
}
#[test]
fn worker_debounces_rapid_auto_jobs() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let dir = tempfile::tempdir().unwrap();
unsafe {
std::env::set_var("XDG_DATA_HOME", dir.path());
}
let session = format!("worker-debounce-{}", std::process::id());
let worker = SnapshotWorker::spawn();
let send_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
for _ in 0..5 {
if worker.submit(SnapshotJob::Auto {
session_name: session.clone(),
snapshot: dummy_snapshot(),
}) {
send_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
std::thread::sleep(Duration::from_millis(20));
}
std::thread::sleep(DEBOUNCE + Duration::from_millis(150));
let path = dir
.path()
.join("ezpn")
.join("sessions")
.join(format!("{session}.json"));
assert!(path.exists(), "debounced auto-save must reach disk");
worker.shutdown();
unsafe {
std::env::remove_var("XDG_DATA_HOME");
}
}
}