pub mod connection;
pub mod handler;
pub mod log;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
use std::time::Instant;
use anyhow::{Context, Result};
use tokio::net::UnixListener;
use crate::ipc::{self, Request, Response, ServerMessage};
use connection::TvConnection;
struct ServerState {
started_at: Instant,
tv_connected_since: Option<Instant>,
tv_host: String,
log_path: PathBuf,
last_error: Option<String>,
}
pub fn server_dir() -> PathBuf {
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".framesmith")
.join("servers")
}
pub fn sanitize_host(host: &str) -> String {
host.replace(['.', ':'], "_")
}
pub fn pid_path(host: &str) -> PathBuf {
server_dir().join(format!("{}.pid", sanitize_host(host)))
}
pub fn socket_path(host: &str) -> PathBuf {
server_dir().join(format!("{}.sock", sanitize_host(host)))
}
pub fn log_path(host: &str) -> PathBuf {
server_dir().join(format!("{}.log", sanitize_host(host)))
}
pub async fn run_server(host: &str, token_file: &Path, timeout: u64) -> Result<()> {
unsafe {
libc::setsid();
}
let dir = server_dir();
std::fs::create_dir_all(&dir).with_context(|| format!("failed to create {}", dir.display()))?;
let log_file_path = log_path(host);
let log_writer = self::log::RotatingLogWriter::new(&log_file_path)
.with_context(|| format!("failed to open log file: {}", log_file_path.display()))?;
tracing_subscriber::fmt()
.with_writer(std::sync::Mutex::new(log_writer))
.with_ansi(false)
.init();
tracing::info!(
"server starting for host {host} (PID {}, token_file={}, timeout={timeout}s)",
std::process::id(),
token_file.display()
);
let pid = std::process::id();
let pid_file = pid_path(host);
std::fs::write(&pid_file, pid.to_string())
.with_context(|| format!("failed to write PID file: {}", pid_file.display()))?;
let sock_path = socket_path(host);
if sock_path.exists() {
let _ = std::fs::remove_file(&sock_path);
}
let listener = UnixListener::bind(&sock_path)
.with_context(|| format!("failed to bind socket: {}", sock_path.display()))?;
tracing::info!("server listening on {}", sock_path.display());
let ip: IpAddr = host
.parse()
.with_context(|| format!("invalid IP address: {host}"))?;
let mut tv_conn = match TvConnection::connect(ip, token_file, timeout).await {
Ok(c) => {
tracing::info!("connected to TV at {host}");
c
}
Err(e) => {
tracing::warn!("initial connection failed (will retry): {e:#}");
TvConnection::new_disconnected(ip, token_file, timeout)
}
};
let mut state = ServerState {
started_at: Instant::now(),
tv_connected_since: if tv_conn.connected {
Some(Instant::now())
} else {
None
},
tv_host: host.to_string(),
log_path: log_file_path,
last_error: if tv_conn.connected {
None
} else {
Some("initial connection failed".to_string())
},
};
let mut reconnect_sleep = std::pin::pin!(tokio::time::sleep(tv_conn.backoff_duration()));
let mut keepalive_interval = tokio::time::interval(std::time::Duration::from_secs(120));
keepalive_interval.tick().await;
loop {
tokio::select! {
accept_result = listener.accept() => {
match accept_result {
Ok((stream, _addr)) => {
let should_shutdown = handle_client(
stream, &mut tv_conn, &mut state,
).await;
if should_shutdown {
tracing::info!("shutdown requested, exiting");
break;
}
}
Err(e) => {
tracing::error!("accept error: {e}");
}
}
}
_ = keepalive_interval.tick(), if tv_conn.connected => {
tv_conn.send_keepalive().await;
}
_ = &mut reconnect_sleep, if !tv_conn.connected => {
if tv_conn.attempt_reconnect().await {
state.tv_connected_since = Some(Instant::now());
state.last_error = None;
}
reconnect_sleep.set(tokio::time::sleep(tv_conn.backoff_duration()));
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("SIGINT received, shutting down");
break;
}
}
}
cleanup(&pid_file, &sock_path);
Ok(())
}
async fn handle_client(
stream: tokio::net::UnixStream,
tv_conn: &mut TvConnection,
state: &mut ServerState,
) -> bool {
let client_pid = stream.peer_cred().ok().and_then(|c| c.pid());
let client_label = match client_pid {
Some(pid) => format!("client PID {pid}"),
None => "client (unknown PID)".to_string(),
};
let (mut reader, mut writer) = stream.into_split();
let versioned: ipc::VersionedRequest = match ipc::read_message(&mut reader).await {
Ok(r) => r,
Err(e) => {
tracing::warn!("{client_label}: failed to read request: {e}");
return false;
}
};
if versioned.version != ipc::PROTOCOL_VERSION {
tracing::warn!(
"{client_label}: version mismatch (server={}, client={}), shutting down",
ipc::PROTOCOL_VERSION,
versioned.version,
);
let msg = ServerMessage::VersionMismatch {
server_version: ipc::PROTOCOL_VERSION.to_string(),
client_version: versioned.version,
};
let _ = ipc::write_message(&mut writer, &msg).await;
return true; }
let request = versioned.request;
tracing::info!("{client_label}: received request: {request:?}");
match &request {
Request::Shutdown => {
tracing::info!("{client_label}: shutdown requested");
let msg = ServerMessage::ShuttingDown;
let _ = ipc::write_message(&mut writer, &msg).await;
return true; }
Request::Status => {
let uptime = state.started_at.elapsed();
let tv_uptime = state.tv_connected_since.map(|t| t.elapsed());
let data = serde_json::json!({
"pid": std::process::id(),
"host": state.tv_host,
"log_path": state.log_path.display().to_string(),
"server_uptime_secs": uptime.as_secs(),
"tv_connected": tv_conn.connected,
"connection_uptime_secs": tv_uptime.map(|d| d.as_secs()),
"last_error": state.last_error,
});
let msg = ServerMessage::Response(Response::Ok { data });
let _ = ipc::write_message(&mut writer, &msg).await;
tracing::info!("{client_label}: responded to status request");
return false;
}
_ => {}
}
let (heartbeat_tx, heartbeat_rx) = tokio::sync::oneshot::channel::<()>();
let writer_for_heartbeat = {
let writer = std::sync::Arc::new(tokio::sync::Mutex::new(writer));
let writer_clone = writer.clone();
let mut heartbeat_rx = heartbeat_rx;
let _heartbeat_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
interval.tick().await; loop {
tokio::select! {
_ = interval.tick() => {
let mut w = writer_clone.lock().await;
if ipc::write_message(&mut *w, &ServerMessage::Heartbeat).await.is_err() {
break;
}
}
_ = &mut heartbeat_rx => {
break;
}
}
}
});
writer
};
let started = Instant::now();
let response = dispatch_with_retry(request, tv_conn, state, &client_label).await;
let elapsed = started.elapsed();
match &response {
Response::TvDisconnected { message } => {
tracing::error!(
"{client_label}: request failed (TV unreachable) in {elapsed:.1?}: {message}"
);
}
Response::Error { message } => {
tracing::warn!("{client_label}: request failed in {elapsed:.1?}: {message}");
}
Response::Ok { .. } => {
tracing::info!("{client_label}: request completed successfully in {elapsed:.1?}");
}
}
let _ = heartbeat_tx.send(());
let msg = ServerMessage::Response(response);
let mut w = writer_for_heartbeat.lock().await;
let _ = ipc::write_message(&mut *w, &msg).await;
false
}
async fn dispatch_with_retry(
request: Request,
tv_conn: &mut TvConnection,
state: &mut ServerState,
client_label: &str,
) -> Response {
if !tv_conn.connected || tv_conn.tv.is_none() {
tracing::info!(
"{client_label}: TV disconnected, attempting one reconnect before dispatching"
);
if tv_conn.attempt_reconnect().await {
state.tv_connected_since = Some(Instant::now());
state.last_error = None;
tracing::info!("{client_label}: reconnect succeeded, proceeding with request");
} else {
tracing::warn!("{client_label}: reconnect failed, rejecting request");
return Response::TvDisconnected {
message: format!(
"TV at {} is not reachable. Server is attempting to reconnect.",
tv_conn.host()
),
};
}
}
let tv = tv_conn.tv.as_ref().unwrap();
let response = handler::dispatch(request.clone(), tv).await;
if !matches!(&response, Response::TvDisconnected { .. }) {
return response;
}
if let Response::TvDisconnected { ref message } = response {
tracing::warn!(
"{client_label}: request hit stale connection: {message}; attempting reconnect + retry"
);
}
tv_conn.mark_disconnected();
if tv_conn.attempt_reconnect().await {
state.tv_connected_since = Some(Instant::now());
state.last_error = None;
tracing::info!("{client_label}: reconnect succeeded, retrying request");
let tv = tv_conn.tv.as_ref().unwrap();
let retry_response = handler::dispatch(request, tv).await;
if let Response::TvDisconnected { ref message } = retry_response {
tracing::error!("{client_label}: retry also failed with connection error: {message}");
tv_conn.mark_disconnected();
state.tv_connected_since = None;
state.last_error = Some(message.clone());
}
retry_response
} else {
tracing::error!("{client_label}: reconnect failed after stale connection, giving up");
state.tv_connected_since = None;
state.last_error = Some("reconnect failed after stale connection".to_string());
response
}
}
fn cleanup(pid_file: &Path, sock_path: &Path) {
let _ = std::fs::remove_file(pid_file);
let _ = std::fs::remove_file(sock_path);
tracing::info!("cleaned up PID and socket files");
}