cflx 0.6.153

Conflux – a spec-driven parallel coding orchestrator that runs AI agents on git worktrees
//! Web monitoring module for Conflux.
//!
//! Provides an optional HTTP server with REST API and WebSocket support
//! for monitoring orchestration state remotely via web browser.

mod url;

#[cfg(feature = "web-monitoring")]
pub mod api;
#[cfg(feature = "web-monitoring")]
pub mod state;
#[cfg(feature = "web-monitoring")]
pub mod websocket;

#[cfg(feature = "web-monitoring")]
use axum::{
    http::{header, StatusCode},
    response::{Html, IntoResponse, Response},
    routing::get,
    Router,
};
#[cfg(feature = "web-monitoring")]
use std::net::SocketAddr;
#[cfg(feature = "web-monitoring")]
use std::sync::Arc;
#[cfg(feature = "web-monitoring")]
use tower_http::cors::{Any, CorsLayer};
#[cfg(feature = "web-monitoring")]
use tower_http::trace::TraceLayer;
#[cfg(feature = "web-monitoring")]
use tracing::{debug, info};

#[cfg(feature = "web-monitoring")]
pub use state::WebState;

pub use url::build_access_url;

/// Web server configuration
#[derive(Debug, Clone)]
pub struct WebConfig {
    /// Whether web monitoring is enabled
    #[allow(dead_code)]
    pub enabled: bool,
    /// Port to bind the HTTP server
    pub port: u16,
    /// Address to bind the HTTP server
    pub bind: String,
    /// Interval in seconds for periodic state refresh from disk (0 to disable)
    pub refresh_interval_secs: u64,
}

impl Default for WebConfig {
    fn default() -> Self {
        Self {
            enabled: false,
            port: 0, // Auto-assign by OS
            bind: "127.0.0.1".to_string(),
            refresh_interval_secs: 5, // Default: refresh every 5 seconds
        }
    }
}

impl WebConfig {
    /// Create a new WebConfig with web monitoring enabled
    pub fn enabled(port: u16, bind: String) -> Self {
        Self {
            enabled: true,
            port,
            bind,
            refresh_interval_secs: 5, // Default: refresh every 5 seconds
        }
    }

    /// Set the refresh interval
    #[allow(dead_code)]
    pub fn with_refresh_interval(mut self, secs: u64) -> Self {
        self.refresh_interval_secs = secs;
        self
    }
}

/// Embedded static files
#[cfg(feature = "web-monitoring")]
mod static_files {
    pub const INDEX_HTML: &str = include_str!("../../web/index.html");
    pub const STYLE_CSS: &str = include_str!("../../web/style.css");
    pub const APP_JS: &str = include_str!("../../web/app.js");
}

#[cfg(feature = "web-monitoring")]
async fn serve_index() -> Html<&'static str> {
    Html(static_files::INDEX_HTML)
}

#[cfg(feature = "web-monitoring")]
async fn serve_css() -> Response {
    (
        StatusCode::OK,
        [(header::CONTENT_TYPE, "text/css")],
        static_files::STYLE_CSS,
    )
        .into_response()
}

#[cfg(feature = "web-monitoring")]
async fn serve_js() -> Response {
    (
        StatusCode::OK,
        [(header::CONTENT_TYPE, "application/javascript")],
        static_files::APP_JS,
    )
        .into_response()
}

/// Start the web monitoring server
#[cfg(feature = "web-monitoring")]
#[allow(dead_code)]
pub async fn start_server(
    config: WebConfig,
    state: Arc<WebState>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let addr: SocketAddr = format!("{}:{}", config.bind, config.port).parse()?;

    // CORS configuration for local development
    let cors = CorsLayer::new()
        .allow_origin(Any)
        .allow_methods(Any)
        .allow_headers(Any);

    // Build router with API and static routes
    let app = Router::new()
        // Static file routes
        .route("/", get(serve_index))
        .route("/style.css", get(serve_css))
        .route("/app.js", get(serve_js))
        // API routes
        .route("/api/health", get(api::health))
        .route("/api/state", get(api::get_state))
        .route("/api/changes", get(api::list_changes))
        .route("/api/changes/{id}", get(api::get_change))
        // Control API routes
        .route(
            "/api/control/start",
            axum::routing::post(api::control_start),
        )
        .route("/api/control/stop", axum::routing::post(api::control_stop))
        .route(
            "/api/control/cancel-stop",
            axum::routing::post(api::control_cancel_stop),
        )
        .route(
            "/api/control/force-stop",
            axum::routing::post(api::control_force_stop),
        )
        .route(
            "/api/control/retry",
            axum::routing::post(api::control_retry),
        )
        // Worktree API routes
        .route("/api/worktrees", get(api::list_worktrees))
        .route(
            "/api/worktrees/refresh",
            axum::routing::post(api::refresh_worktrees),
        )
        .route(
            "/api/worktrees/create",
            axum::routing::post(api::create_worktree),
        )
        .route(
            "/api/worktrees/delete",
            axum::routing::post(api::delete_worktree),
        )
        .route(
            "/api/worktrees/merge",
            axum::routing::post(api::merge_worktree),
        )
        .route(
            "/api/worktrees/command",
            axum::routing::post(api::execute_worktree_command),
        )
        // WebSocket route
        .route("/ws", get(websocket::ws_handler))
        .layer(cors)
        .layer(TraceLayer::new_for_http())
        .with_state(state);

    // Bind to the specified address (port 0 = OS auto-assign)
    let listener = tokio::net::TcpListener::bind(addr).await?;
    // Get the actual bound address (includes OS-assigned port if port was 0)
    let actual_addr = listener.local_addr()?;
    info!("Web monitoring server listening on http://{}", actual_addr);

    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await?;

    Ok(())
}

#[cfg(feature = "web-monitoring")]
async fn shutdown_signal() {
    tokio::signal::ctrl_c()
        .await
        .expect("Failed to install Ctrl+C handler");
    info!("Shutting down web server...");
}

/// Start the web server in a background task (non-blocking)
#[cfg(feature = "web-monitoring")]
#[allow(dead_code)]
pub fn spawn_server(
    config: WebConfig,
    state: Arc<WebState>,
) -> tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
    tokio::spawn(async move { start_server(config, state).await })
}

/// Start the web server and return the accessible URL.
///
/// This function binds to the specified address, determines the actual port
/// (important when port 0 is used for auto-assignment), and constructs an
/// accessible URL that can be used for QR code generation.
///
/// Returns a tuple of (JoinHandle, accessible_url).
#[cfg(feature = "web-monitoring")]
pub async fn spawn_server_with_url(
    config: WebConfig,
    state: Arc<WebState>,
) -> Result<
    (
        tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>,
        String,
    ),
    Box<dyn std::error::Error + Send + Sync>,
> {
    let addr: SocketAddr = format!("{}:{}", config.bind, config.port).parse()?;

    // CORS configuration for local development
    let cors = CorsLayer::new()
        .allow_origin(Any)
        .allow_methods(Any)
        .allow_headers(Any);

    // Build router with API and static routes
    let app = Router::new()
        // Static file routes
        .route("/", get(serve_index))
        .route("/style.css", get(serve_css))
        .route("/app.js", get(serve_js))
        // API routes
        .route("/api/health", get(api::health))
        .route("/api/state", get(api::get_state))
        .route("/api/changes", get(api::list_changes))
        .route("/api/changes/{id}", get(api::get_change))
        // Control API routes
        .route(
            "/api/control/start",
            axum::routing::post(api::control_start),
        )
        .route("/api/control/stop", axum::routing::post(api::control_stop))
        .route(
            "/api/control/cancel-stop",
            axum::routing::post(api::control_cancel_stop),
        )
        .route(
            "/api/control/force-stop",
            axum::routing::post(api::control_force_stop),
        )
        .route(
            "/api/control/retry",
            axum::routing::post(api::control_retry),
        )
        // Worktree API routes
        .route("/api/worktrees", get(api::list_worktrees))
        .route(
            "/api/worktrees/refresh",
            axum::routing::post(api::refresh_worktrees),
        )
        .route(
            "/api/worktrees/create",
            axum::routing::post(api::create_worktree),
        )
        .route(
            "/api/worktrees/delete",
            axum::routing::post(api::delete_worktree),
        )
        .route(
            "/api/worktrees/merge",
            axum::routing::post(api::merge_worktree),
        )
        .route(
            "/api/worktrees/command",
            axum::routing::post(api::execute_worktree_command),
        )
        // WebSocket route
        .route("/ws", get(websocket::ws_handler))
        .layer(cors)
        .layer(TraceLayer::new_for_http())
        .with_state(state.clone());

    // Bind to the specified address (port 0 = OS auto-assign)
    let listener = tokio::net::TcpListener::bind(addr).await?;
    // Get the actual bound address (includes OS-assigned port if port was 0)
    let actual_addr = listener.local_addr()?;
    let actual_port = actual_addr.port();

    // Build accessible URL using local IP for 0.0.0.0 bind
    let url = build_access_url(&config.bind, actual_port);
    info!("Web monitoring server listening on {}", url);

    // Spawn periodic refresh task if enabled
    if config.refresh_interval_secs > 0 {
        let state_clone = state;
        let interval = config.refresh_interval_secs;
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(interval));
            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

            loop {
                interval.tick().await;
                if let Err(e) = state_clone.refresh_from_disk().await {
                    debug!("Periodic refresh failed: {}", e);
                }
            }
        });
    }

    // Spawn the server in a background task
    let handle = tokio::spawn(async move {
        axum::serve(listener, app)
            .with_graceful_shutdown(shutdown_signal())
            .await?;
        Ok(())
    });

    Ok((handle, url))
}

// Stub implementations for when web-monitoring feature is disabled
#[cfg(not(feature = "web-monitoring"))]
pub async fn start_server(
    _config: WebConfig,
    _state: (),
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    Err("Web monitoring feature is not enabled. Compile with --features web-monitoring".into())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_web_config_default() {
        let config = WebConfig::default();
        assert!(!config.enabled);
        assert_eq!(config.port, 0); // Auto-assign by OS
        assert_eq!(config.bind, "127.0.0.1");
    }

    #[test]
    fn test_web_config_enabled() {
        let config = WebConfig::enabled(9000, "0.0.0.0".to_string());
        assert!(config.enabled);
        assert_eq!(config.port, 9000);
        assert_eq!(config.bind, "0.0.0.0");
    }

    #[test]
    fn test_web_config_auto_assign_port() {
        // When port is 0, OS will auto-assign an available port
        let config = WebConfig::enabled(0, "127.0.0.1".to_string());
        assert!(config.enabled);
        assert_eq!(config.port, 0);
        assert_eq!(config.bind, "127.0.0.1");
    }
}