Skip to main content

flow_server/
lib.rs

1//! High-performance web server for Agent Flow task monitoring.
2//!
3//! Built on [axum](https://docs.rs/axum) with real-time updates via
4//! Server-Sent Events (SSE) and WebSocket connections. Watches the
5//! filesystem for task changes and broadcasts updates to all connected clients.
6//!
7//! # Features
8//!
9//! - REST API for sessions, tasks, and features
10//! - SSE and WebSocket endpoints for live updates
11//! - File watcher using the `notify` crate
12//! - Optional `SQLite` database for feature management
13//! - Static file serving for the web UI
14
15pub mod error;
16pub mod helpers;
17pub mod routes;
18pub mod sse;
19pub mod state;
20pub mod watcher;
21pub mod ws;
22
23pub use error::{AppError, AppResult};
24pub use state::{AppState, MetadataCache};
25
26use axum::{
27    routing::{delete, get, post},
28    Router,
29};
30use flow_core::AgentConfig;
31use std::sync::Arc;
32use tokio::sync::{broadcast, RwLock};
33use tower_http::services::ServeDir;
34use tracing::info;
35
36/// Build the axum router with all routes
37pub fn build_router(state: Arc<AppState>) -> Router {
38    let mut app = Router::new()
39        .route("/api/sessions", get(routes::sessions::list_sessions))
40        .route(
41            "/api/sessions/:session_id",
42            get(routes::sessions::get_session),
43        )
44        .route("/api/tasks/all", get(routes::tasks::get_all_tasks))
45        .route(
46            "/api/tasks/:session_id/:task_id/note",
47            post(routes::tasks::add_note),
48        )
49        .route(
50            "/api/tasks/:session_id/:task_id",
51            delete(routes::tasks::delete_task),
52        )
53        .route("/api/events", get(sse::sse_handler))
54        .route("/api/ws", get(ws::ws_handler))
55        .route("/api/theme", get(routes::theme::get_theme))
56        .route("/api/theme", post(routes::theme::set_theme));
57
58    // Add feature routes if database is available
59    if state.db.is_some() {
60        app = app.nest("/api/features", routes::features::feature_routes());
61    }
62
63    app.with_state(state)
64}
65
66/// Run the axum server with the given configuration
67#[allow(clippy::cognitive_complexity)]
68pub async fn run_server(config: AgentConfig) -> flow_core::Result<()> {
69    let tasks_dir = config.tasks_dir();
70    let projects_dir = config.projects_dir();
71
72    info!("Tasks directory: {}", tasks_dir.display());
73    info!("Projects directory: {}", projects_dir.display());
74
75    // Determine public directory
76    let public_dir = config.public_dir.as_ref().map_or_else(
77        || {
78            // Default to ../public relative to binary, or ./public as fallback
79            std::env::current_exe()
80                .ok()
81                .and_then(|p| p.parent().map(std::path::Path::to_path_buf))
82                .map_or_else(
83                    || std::path::PathBuf::from("public"),
84                    |exe| {
85                        let candidate = exe.join("..").join("public");
86                        if candidate.exists() {
87                            candidate
88                        } else {
89                            std::path::PathBuf::from("public")
90                        }
91                    },
92                )
93        },
94        std::clone::Clone::clone,
95    );
96
97    info!("Public directory: {}", public_dir.display());
98
99    // Create broadcast channel for SSE/WS (buffer 256 messages)
100    let (tx, _) = broadcast::channel::<String>(256);
101
102    // Database is optional - can be added later for feature management
103    let db = None;
104
105    let state = Arc::new(AppState {
106        tasks_dir: tasks_dir.clone(),
107        projects_dir: projects_dir.clone(),
108        tx: tx.clone(),
109        metadata_cache: RwLock::new(MetadataCache::new()),
110        db,
111    });
112
113    // Set up file watchers (keep handles alive)
114    let _watchers = watcher::setup_file_watcher(&tasks_dir, &projects_dir, tx);
115
116    // Build router with fallback to serve static files
117    let app = build_router(state.clone()).fallback_service(ServeDir::new(&public_dir));
118
119    let addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.port));
120    info!("Server running at http://localhost:{}", config.port);
121
122    // Open browser if requested (cross-platform)
123    if config.open_browser {
124        let url = format!("http://localhost:{}", config.port);
125        tokio::spawn(async move {
126            let _ = open_browser(&url).await;
127        });
128    }
129
130    let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
131        if e.kind() == std::io::ErrorKind::AddrInUse {
132            flow_core::FlowError::Io(std::io::Error::new(
133                std::io::ErrorKind::AddrInUse,
134                format!(
135                    "Port {} already in use. Try: flow serve --port {}",
136                    config.port,
137                    config.port + 1
138                ),
139            ))
140        } else {
141            flow_core::FlowError::Io(e)
142        }
143    })?;
144
145    axum::serve(listener, app)
146        .await
147        .map_err(|e| flow_core::FlowError::Io(std::io::Error::other(e)))?;
148
149    Ok(())
150}
151
152/// Open a URL in the default browser (cross-platform).
153async fn open_browser(url: &str) -> Result<(), std::io::Error> {
154    #[cfg(target_os = "macos")]
155    {
156        tokio::process::Command::new("open")
157            .arg(url)
158            .status()
159            .await?;
160    }
161    #[cfg(target_os = "linux")]
162    {
163        tokio::process::Command::new("xdg-open")
164            .arg(url)
165            .status()
166            .await?;
167    }
168    #[cfg(target_os = "windows")]
169    {
170        tokio::process::Command::new("cmd")
171            .args(["/C", "start", "", url])
172            .status()
173            .await?;
174    }
175    Ok(())
176}