framesmith-cli 0.1.0

CLI tool for controlling Samsung Frame TVs over the local network
pub mod connection;
pub mod handler;
pub mod log;

use std::net::IpAddr;
use std::path::{Path, PathBuf};
use std::time::Instant;

use anyhow::{Context, Result};
use tokio::net::UnixListener;

use crate::ipc::{self, Request, Response, ServerMessage};
use connection::TvConnection;

/// Server state available to request handlers.
struct ServerState {
    started_at: Instant,
    tv_connected_since: Option<Instant>,
    tv_host: String,
    log_path: PathBuf,
    last_error: Option<String>,
}

/// Compute paths for a given host.
pub fn server_dir() -> PathBuf {
    dirs::home_dir()
        .unwrap_or_else(|| PathBuf::from("."))
        .join(".framesmith")
        .join("servers")
}

pub fn sanitize_host(host: &str) -> String {
    host.replace(['.', ':'], "_")
}

pub fn pid_path(host: &str) -> PathBuf {
    server_dir().join(format!("{}.pid", sanitize_host(host)))
}

pub fn socket_path(host: &str) -> PathBuf {
    server_dir().join(format!("{}.sock", sanitize_host(host)))
}

pub fn log_path(host: &str) -> PathBuf {
    server_dir().join(format!("{}.log", sanitize_host(host)))
}

/// Main entry point for the background server process.
/// Called when `--internal-server` is detected.
pub async fn run_server(host: &str, token_file: &Path, timeout: u64) -> Result<()> {
    // Detach from terminal
    unsafe {
        libc::setsid();
    }

    let dir = server_dir();
    std::fs::create_dir_all(&dir).with_context(|| format!("failed to create {}", dir.display()))?;

    // Set up rotating log
    let log_file_path = log_path(host);
    let log_writer = self::log::RotatingLogWriter::new(&log_file_path)
        .with_context(|| format!("failed to open log file: {}", log_file_path.display()))?;

    // Redirect tracing to log file
    tracing_subscriber::fmt()
        .with_writer(std::sync::Mutex::new(log_writer))
        .with_ansi(false)
        .init();

    tracing::info!(
        "server starting for host {host} (PID {}, token_file={}, timeout={timeout}s)",
        std::process::id(),
        token_file.display()
    );

    // Write PID file
    let pid = std::process::id();
    let pid_file = pid_path(host);
    std::fs::write(&pid_file, pid.to_string())
        .with_context(|| format!("failed to write PID file: {}", pid_file.display()))?;

    // Clean up stale socket and bind immediately so clients can connect
    // while the TV connection is being established
    let sock_path = socket_path(host);
    if sock_path.exists() {
        let _ = std::fs::remove_file(&sock_path);
    }

    let listener = UnixListener::bind(&sock_path)
        .with_context(|| format!("failed to bind socket: {}", sock_path.display()))?;

    tracing::info!("server listening on {}", sock_path.display());

    // Connect to TV (non-fatal if it fails — server retries in background)
    let ip: IpAddr = host
        .parse()
        .with_context(|| format!("invalid IP address: {host}"))?;

    let mut tv_conn = match TvConnection::connect(ip, token_file, timeout).await {
        Ok(c) => {
            tracing::info!("connected to TV at {host}");
            c
        }
        Err(e) => {
            tracing::warn!("initial connection failed (will retry): {e:#}");
            TvConnection::new_disconnected(ip, token_file, timeout)
        }
    };

    let mut state = ServerState {
        started_at: Instant::now(),
        tv_connected_since: if tv_conn.connected {
            Some(Instant::now())
        } else {
            None
        },
        tv_host: host.to_string(),
        log_path: log_file_path,
        last_error: if tv_conn.connected {
            None
        } else {
            Some("initial connection failed".to_string())
        },
    };

    // Main loop
    let mut reconnect_sleep = std::pin::pin!(tokio::time::sleep(tv_conn.backoff_duration()));
    let mut keepalive_interval = tokio::time::interval(std::time::Duration::from_secs(120));
    keepalive_interval.tick().await; // skip immediate first tick

    loop {
        tokio::select! {
            accept_result = listener.accept() => {
                match accept_result {
                    Ok((stream, _addr)) => {
                        let should_shutdown = handle_client(
                            stream, &mut tv_conn, &mut state,
                        ).await;
                        if should_shutdown {
                            tracing::info!("shutdown requested, exiting");
                            break;
                        }
                    }
                    Err(e) => {
                        tracing::error!("accept error: {e}");
                    }
                }
            }
            _ = keepalive_interval.tick(), if tv_conn.connected => {
                tv_conn.send_keepalive().await;
            }
            _ = &mut reconnect_sleep, if !tv_conn.connected => {
                if tv_conn.attempt_reconnect().await {
                    state.tv_connected_since = Some(Instant::now());
                    state.last_error = None;
                }
                // Reset sleep with current backoff duration
                reconnect_sleep.set(tokio::time::sleep(tv_conn.backoff_duration()));
            }
            _ = tokio::signal::ctrl_c() => {
                tracing::info!("SIGINT received, shutting down");
                break;
            }
        }
    }

    cleanup(&pid_file, &sock_path);
    Ok(())
}

async fn handle_client(
    stream: tokio::net::UnixStream,
    tv_conn: &mut TvConnection,
    state: &mut ServerState,
) -> bool {
    // Try to get the client PID for logging
    let client_pid = stream.peer_cred().ok().and_then(|c| c.pid());
    let client_label = match client_pid {
        Some(pid) => format!("client PID {pid}"),
        None => "client (unknown PID)".to_string(),
    };

    let (mut reader, mut writer) = stream.into_split();

    // Read request
    let versioned: ipc::VersionedRequest = match ipc::read_message(&mut reader).await {
        Ok(r) => r,
        Err(e) => {
            tracing::warn!("{client_label}: failed to read request: {e}");
            return false;
        }
    };

    // Check protocol version
    if versioned.version != ipc::PROTOCOL_VERSION {
        tracing::warn!(
            "{client_label}: version mismatch (server={}, client={}), shutting down",
            ipc::PROTOCOL_VERSION,
            versioned.version,
        );
        let msg = ServerMessage::VersionMismatch {
            server_version: ipc::PROTOCOL_VERSION.to_string(),
            client_version: versioned.version,
        };
        let _ = ipc::write_message(&mut writer, &msg).await;
        return true; // signal shutdown
    }

    let request = versioned.request;
    tracing::info!("{client_label}: received request: {request:?}");

    // Handle server-management requests specially
    match &request {
        Request::Shutdown => {
            tracing::info!("{client_label}: shutdown requested");
            let msg = ServerMessage::ShuttingDown;
            let _ = ipc::write_message(&mut writer, &msg).await;
            return true; // signal shutdown
        }
        Request::Status => {
            let uptime = state.started_at.elapsed();
            let tv_uptime = state.tv_connected_since.map(|t| t.elapsed());
            let data = serde_json::json!({
                "pid": std::process::id(),
                "host": state.tv_host,
                "log_path": state.log_path.display().to_string(),
                "server_uptime_secs": uptime.as_secs(),
                "tv_connected": tv_conn.connected,
                "connection_uptime_secs": tv_uptime.map(|d| d.as_secs()),
                "last_error": state.last_error,
            });
            let msg = ServerMessage::Response(Response::Ok { data });
            let _ = ipc::write_message(&mut writer, &msg).await;
            tracing::info!("{client_label}: responded to status request");
            return false;
        }
        _ => {}
    }

    // Spawn heartbeat task
    let (heartbeat_tx, heartbeat_rx) = tokio::sync::oneshot::channel::<()>();
    let writer_for_heartbeat = {
        let writer = std::sync::Arc::new(tokio::sync::Mutex::new(writer));
        let writer_clone = writer.clone();
        let mut heartbeat_rx = heartbeat_rx;

        let _heartbeat_handle = tokio::spawn(async move {
            let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
            interval.tick().await; // skip immediate first tick
            loop {
                tokio::select! {
                    _ = interval.tick() => {
                        let mut w = writer_clone.lock().await;
                        if ipc::write_message(&mut *w, &ServerMessage::Heartbeat).await.is_err() {
                            break;
                        }
                    }
                    _ = &mut heartbeat_rx => {
                        break;
                    }
                }
            }
        });

        writer
    };

    // Execute request with one retry on connection failure.
    // If the TV is disconnected (or the request fails with a connection error),
    // attempt exactly one reconnect and retry before giving up.
    let started = Instant::now();
    let response = dispatch_with_retry(request, tv_conn, state, &client_label).await;
    let elapsed = started.elapsed();

    match &response {
        Response::TvDisconnected { message } => {
            tracing::error!(
                "{client_label}: request failed (TV unreachable) in {elapsed:.1?}: {message}"
            );
        }
        Response::Error { message } => {
            tracing::warn!("{client_label}: request failed in {elapsed:.1?}: {message}");
        }
        Response::Ok { .. } => {
            tracing::info!("{client_label}: request completed successfully in {elapsed:.1?}");
        }
    }

    // Cancel heartbeat
    let _ = heartbeat_tx.send(());

    // Send response
    let msg = ServerMessage::Response(response);
    let mut w = writer_for_heartbeat.lock().await;
    let _ = ipc::write_message(&mut *w, &msg).await;

    false
}

/// Try to dispatch a request, reconnecting once on connection failure.
///
/// Covers two cases:
/// 1. TV is already known-disconnected: attempt one reconnect, then dispatch.
/// 2. TV appears connected but the request fails with a connection error
///    (stale WebSocket): reconnect once and retry the same request.
///
/// If reconnection or the retry also fails, the TV is marked disconnected
/// (triggering the normal backoff loop) and the error is returned to the client.
async fn dispatch_with_retry(
    request: Request,
    tv_conn: &mut TvConnection,
    state: &mut ServerState,
    client_label: &str,
) -> Response {
    // If already disconnected, try one reconnect before giving up
    if !tv_conn.connected || tv_conn.tv.is_none() {
        tracing::info!(
            "{client_label}: TV disconnected, attempting one reconnect before dispatching"
        );
        if tv_conn.attempt_reconnect().await {
            state.tv_connected_since = Some(Instant::now());
            state.last_error = None;
            tracing::info!("{client_label}: reconnect succeeded, proceeding with request");
        } else {
            tracing::warn!("{client_label}: reconnect failed, rejecting request");
            return Response::TvDisconnected {
                message: format!(
                    "TV at {} is not reachable. Server is attempting to reconnect.",
                    tv_conn.host()
                ),
            };
        }
    }

    // At this point we have a connected TV — dispatch the request
    let tv = tv_conn.tv.as_ref().unwrap();
    let response = handler::dispatch(request.clone(), tv).await;

    // If the dispatch succeeded (or failed with a non-connection error), we're done
    if !matches!(&response, Response::TvDisconnected { .. }) {
        return response;
    }

    // Connection error on a supposedly-connected TV — stale connection.
    // Try exactly one reconnect + retry.
    if let Response::TvDisconnected { ref message } = response {
        tracing::warn!(
            "{client_label}: request hit stale connection: {message}; attempting reconnect + retry"
        );
    }

    tv_conn.mark_disconnected();

    if tv_conn.attempt_reconnect().await {
        state.tv_connected_since = Some(Instant::now());
        state.last_error = None;
        tracing::info!("{client_label}: reconnect succeeded, retrying request");

        let tv = tv_conn.tv.as_ref().unwrap();
        let retry_response = handler::dispatch(request, tv).await;

        // If the retry also fails with a connection error, mark disconnected
        if let Response::TvDisconnected { ref message } = retry_response {
            tracing::error!("{client_label}: retry also failed with connection error: {message}");
            tv_conn.mark_disconnected();
            state.tv_connected_since = None;
            state.last_error = Some(message.clone());
        }

        retry_response
    } else {
        tracing::error!("{client_label}: reconnect failed after stale connection, giving up");
        state.tv_connected_since = None;
        state.last_error = Some("reconnect failed after stale connection".to_string());
        response
    }
}

fn cleanup(pid_file: &Path, sock_path: &Path) {
    let _ = std::fs::remove_file(pid_file);
    let _ = std::fs::remove_file(sock_path);
    tracing::info!("cleaned up PID and socket files");
}