1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result, bail};
use indicatif::{ProgressBar, ProgressStyle};
use tokio::net::UnixStream;
use crate::ipc::{self, Request, Response, ServerMessage};
use crate::server;
enum SendResult {
Response(Response),
VersionMismatch,
/// The server dropped the connection immediately after we sent our request,
/// likely because it's an older version that can't deserialize the current
/// wire format.
ServerIncompatible,
}
/// A thin client that communicates with the background server over a Unix socket.
pub struct Client {
host: String,
auth_token_file: PathBuf,
timeout: u64,
}
impl Client {
pub fn new(host: &str, auth_token_file: &Path, timeout: u64) -> Self {
Self {
host: host.to_string(),
auth_token_file: auth_token_file.to_owned(),
timeout,
}
}
/// Send a request to the server, auto-starting it if needed.
/// Shows a TUI spinner while waiting.
///
/// If the server reports a version mismatch, it will shut itself down.
/// This method then waits for it to exit, starts a fresh server, and
/// retries the request once.
pub async fn send_request(&self, request: Request) -> Result<Response> {
match self.send_request_inner(&request).await? {
SendResult::Response(resp) => Ok(resp),
SendResult::VersionMismatch | SendResult::ServerIncompatible => {
// Server is shutting itself down (VersionMismatch) or couldn't
// parse our request at all (ServerIncompatible, e.g. a pre-versioning
// server). Either way: kill it, start a new one, retry once.
self.kill_and_wait_for_server_exit().await;
match self.send_request_inner(&request).await? {
SendResult::Response(resp) => Ok(resp),
SendResult::VersionMismatch => {
bail!("server version mismatch persisted after restart");
}
SendResult::ServerIncompatible => {
bail!("lost connection to server after restart (early EOF)");
}
}
}
}
}
fn send_request_inner(
&self,
request: &Request,
) -> impl std::future::Future<Output = Result<SendResult>> + '_ {
let request = request.clone();
async move {
let spinner = make_spinner("Connecting to server...");
let stream = self
.connect_or_start()
.await
.context("failed to connect to server")?;
spinner.set_message("Waiting for TV...");
// Send versioned request
let (mut reader, mut writer) = stream.into_split();
let versioned = ipc::VersionedRequest {
version: ipc::PROTOCOL_VERSION.to_string(),
request,
};
ipc::write_message(&mut writer, &versioned)
.await
.context("failed to send request to server")?;
// Read response loop (handle heartbeats)
let result = loop {
match tokio::time::timeout(
Duration::from_secs(10),
ipc::read_message::<_, ServerMessage>(&mut reader),
)
.await
{
Ok(Ok(ServerMessage::Heartbeat)) => {
// Reset timeout, continue waiting
continue;
}
Ok(Ok(ServerMessage::Response(resp))) => {
break SendResult::Response(resp);
}
Ok(Ok(ServerMessage::ShuttingDown)) => {
spinner.finish_and_clear();
bail!("server is shutting down");
}
Ok(Ok(ServerMessage::VersionMismatch {
server_version,
client_version: _,
})) => {
spinner.finish_and_clear();
tracing::debug!(
"server version mismatch (server={server_version}, \
client={}), restarting server",
ipc::PROTOCOL_VERSION,
);
break SendResult::VersionMismatch;
}
Ok(Err(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
// Server dropped the connection without responding.
// Most likely an older server that can't parse our
// VersionedRequest wire format.
spinner.finish_and_clear();
tracing::debug!(
"server dropped connection (early EOF) — \
likely an incompatible server version"
);
break SendResult::ServerIncompatible;
}
Ok(Err(e)) => {
spinner.finish_and_clear();
bail!("lost connection to server: {e}");
}
Err(_) => {
// 10s timeout with no heartbeat — check if server is alive
if !is_server_alive(&self.host) {
spinner.finish_and_clear();
bail!("server process died unexpectedly");
}
// Server alive but no heartbeat — keep waiting one more cycle
continue;
}
}
};
spinner.finish_and_clear();
Ok(result)
}
}
/// Kill the server process (if running) and wait for it to exit.
///
/// Used when the server is incompatible (e.g. older version that can't
/// parse our wire format) or has signalled a version mismatch and is
/// shutting itself down. In the latter case the SIGTERM is harmless —
/// the server is already exiting.
async fn kill_and_wait_for_server_exit(&self) {
if let Some(pid) = read_server_pid(&self.host) {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
for _ in 0..50 {
if !is_server_alive(&self.host) {
// Clean up stale files in case the old server didn't
let _ = std::fs::remove_file(server::pid_path(&self.host));
let _ = std::fs::remove_file(server::socket_path(&self.host));
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
/// Connect to the server socket, auto-starting the server if necessary.
async fn connect_or_start(&self) -> Result<UnixStream> {
let sock = server::socket_path(&self.host);
let pid_file = server::pid_path(&self.host);
// Try connecting to existing server first
if sock.exists()
&& let Ok(stream) = UnixStream::connect(&sock).await
{
return Ok(stream);
}
// Clean up stale files
if pid_file.exists() && !is_server_alive(&self.host) {
let _ = std::fs::remove_file(&pid_file);
let _ = std::fs::remove_file(&sock);
}
// Start a new server
self.spawn_server()?;
// Wait for socket to appear
for _ in 0..100 {
tokio::time::sleep(Duration::from_millis(100)).await;
if let Ok(stream) = UnixStream::connect(&sock).await {
return Ok(stream);
}
}
bail!(
"server failed to start within 10s (check log at {})",
server::log_path(&self.host).display()
)
}
fn spawn_server(&self) -> Result<()> {
let exe = std::env::current_exe().context("failed to determine current executable path")?;
std::process::Command::new(exe)
.arg("--internal-server")
.arg("--host")
.arg(&self.host)
.arg("--auth-token-file")
.arg(&self.auth_token_file)
.arg("--timeout")
.arg(self.timeout.to_string())
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.context("failed to spawn server process")?;
Ok(())
}
}
/// Check if the server process for a host is still alive.
pub fn is_server_alive(host: &str) -> bool {
let pid_file = server::pid_path(host);
if let Ok(contents) = std::fs::read_to_string(&pid_file) {
if let Ok(pid) = contents.trim().parse::<i32>() {
// Signal 0 checks if process exists without sending a signal
unsafe { libc::kill(pid, 0) == 0 }
} else {
false
}
} else {
false
}
}
/// Read the PID from the PID file for a host.
pub fn read_server_pid(host: &str) -> Option<u32> {
let pid_file = server::pid_path(host);
std::fs::read_to_string(&pid_file)
.ok()
.and_then(|s| s.trim().parse().ok())
}
fn make_spinner(msg: &str) -> ProgressBar {
let pb = ProgressBar::new_spinner();
pb.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.cyan} {msg}")
.unwrap(),
);
pb.set_message(msg.to_string());
pb.enable_steady_tick(Duration::from_millis(100));
pb
}