hammerwork_web/
server.rs

1//! Web server implementation for the Hammerwork dashboard.
2//!
3//! This module provides the main `WebDashboard` struct for starting and configuring
4//! the web server, including database connections, authentication, and route setup.
5//!
6//! # Examples
7//!
8//! ## Basic Server Setup
9//!
10//! ```rust,no_run
11//! use hammerwork_web::{WebDashboard, DashboardConfig};
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//!     let config = DashboardConfig::new()
16//!         .with_bind_address("127.0.0.1", 8080)
17//!         .with_database_url("postgresql://localhost/hammerwork");
18//!
19//!     let dashboard = WebDashboard::new(config).await?;
20//!     dashboard.start().await?;
21//!
22//!     Ok(())
23//! }
24//! ```
25//!
26//! ## Server with Authentication
27//!
28//! ```rust,no_run
29//! use hammerwork_web::{WebDashboard, DashboardConfig};
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//!     let config = DashboardConfig::new()
34//!         .with_bind_address("0.0.0.0", 9090)
35//!         .with_database_url("postgresql://localhost/hammerwork")
36//!         .with_auth("admin", "$2b$12$hash...")
37//!         .with_cors(true);
38//!
39//!     let dashboard = WebDashboard::new(config).await?;
40//!     dashboard.start().await?;
41//!
42//!     Ok(())
43//! }
44//! ```
45
46use crate::{
47    Result, api,
48    auth::{AuthState, auth_filter, handle_auth_rejection},
49    config::DashboardConfig,
50    websocket::WebSocketState,
51};
52use hammerwork::JobQueue;
53use std::{net::SocketAddr, sync::Arc};
54use tokio::sync::RwLock;
55use tracing::{error, info};
56use warp::{Filter, Reply};
57
58/// Main web dashboard server.
59///
60/// The `WebDashboard` provides a complete web interface for monitoring and managing
61/// Hammerwork job queues. It includes REST API endpoints, WebSocket support for
62/// real-time updates, authentication, and a modern HTML/CSS/JS frontend.
63///
64/// # Examples
65///
66/// ```rust,no_run
67/// use hammerwork_web::{WebDashboard, DashboardConfig};
68/// use std::path::PathBuf;
69///
70/// #[tokio::main]
71/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
72///     let config = DashboardConfig::new()
73///         .with_bind_address("127.0.0.1", 8080)
74///         .with_database_url("postgresql://localhost/hammerwork")
75///         .with_static_dir(PathBuf::from("./assets"))
76///         .with_cors(false);
77///
78///     let dashboard = WebDashboard::new(config).await?;
79///     dashboard.start().await?;
80///
81///     Ok(())
82/// }
83/// ```
84pub struct WebDashboard {
85    config: DashboardConfig,
86    auth_state: AuthState,
87    websocket_state: Arc<RwLock<WebSocketState>>,
88}
89
90impl WebDashboard {
91    /// Create a new web dashboard instance.
92    ///
93    /// This initializes the dashboard with the provided configuration but does not
94    /// start the web server. Call `start()` to begin serving requests.
95    ///
96    /// # Examples
97    ///
98    /// ```rust,no_run
99    /// use hammerwork_web::{WebDashboard, DashboardConfig};
100    ///
101    /// #[tokio::main]
102    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
103    ///     let config = DashboardConfig::new()
104    ///         .with_database_url("postgresql://localhost/hammerwork");
105    ///
106    ///     let dashboard = WebDashboard::new(config).await?;
107    ///     // Dashboard is created but not yet started
108    ///     Ok(())
109    /// }
110    /// ```
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the configuration is invalid or if initialization fails.
115    pub async fn new(config: DashboardConfig) -> Result<Self> {
116        let auth_state = AuthState::new(config.auth.clone());
117        let websocket_state = Arc::new(RwLock::new(WebSocketState::new(config.websocket.clone())));
118
119        Ok(Self {
120            config,
121            auth_state,
122            websocket_state,
123        })
124    }
125
126    /// Start the web server
127    pub async fn start(self) -> Result<()> {
128        let bind_addr: SocketAddr = self.config.bind_addr().parse()?;
129
130        // Create job queue
131        let queue = Arc::new(self.create_job_queue().await?);
132
133        // Create API routes
134        let api_routes = Self::create_api_routes_static(queue.clone(), self.auth_state.clone());
135
136        // Create WebSocket routes
137        let websocket_routes = Self::create_websocket_routes_static(
138            self.websocket_state.clone(),
139            self.auth_state.clone(),
140        );
141
142        // Create static file routes
143        let static_routes = Self::create_static_routes_static(self.config.static_dir.clone())?;
144
145        // Combine all routes
146        let routes = api_routes
147            .or(websocket_routes)
148            .or(static_routes)
149            .recover(handle_auth_rejection);
150
151        // Apply CORS if enabled (simplified approach)
152        let routes = routes.with(if self.config.enable_cors {
153            warp::cors()
154                .allow_any_origin()
155                .allow_headers(vec!["content-type", "authorization"])
156                .allow_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"])
157        } else {
158            warp::cors().allow_origin("none") // Effectively disable CORS
159        });
160
161        info!("Starting web server on {}", bind_addr);
162
163        // Start cleanup task for auth state
164        let auth_state_cleanup = self.auth_state.clone();
165        tokio::spawn(async move {
166            let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); // 5 minutes
167            loop {
168                interval.tick().await;
169                auth_state_cleanup.cleanup_expired_attempts().await;
170            }
171        });
172
173        // Start WebSocket ping task
174        let websocket_state_ping = self.websocket_state.clone();
175        let ping_interval = self.config.websocket.ping_interval;
176        tokio::spawn(async move {
177            let mut interval = tokio::time::interval(ping_interval);
178            loop {
179                interval.tick().await;
180                let state = websocket_state_ping.read().await;
181                state.ping_all_connections().await;
182            }
183        });
184
185        // Start the server
186        warp::serve(routes).run(bind_addr).await;
187
188        Ok(())
189    }
190
191    /// Create job queue from database URL
192    async fn create_job_queue(&self) -> Result<QueueType> {
193        // Determine database type from URL and create appropriate queue
194        if self.config.database_url.starts_with("postgres") {
195            #[cfg(feature = "postgres")]
196            {
197                let pg_pool = sqlx::PgPool::connect(&self.config.database_url).await?;
198                info!(
199                    "Connected to PostgreSQL with {} connections",
200                    self.config.pool_size
201                );
202                Ok(JobQueue::new(pg_pool))
203            }
204            #[cfg(not(feature = "postgres"))]
205            {
206                return Err(anyhow::anyhow!(
207                    "PostgreSQL support not enabled. Rebuild with --features postgres"
208                ));
209            }
210        } else if self.config.database_url.starts_with("mysql") {
211            #[cfg(feature = "mysql")]
212            {
213                #[cfg(all(feature = "mysql", not(feature = "postgres")))]
214                {
215                    let mysql_pool = sqlx::MySqlPool::connect(&self.config.database_url).await?;
216                    info!(
217                        "Connected to MySQL with {} connections",
218                        self.config.pool_size
219                    );
220                    Ok(JobQueue::new(mysql_pool))
221                }
222                #[cfg(all(feature = "postgres", feature = "mysql"))]
223                {
224                    return Err(anyhow::anyhow!(
225                        "MySQL database URL provided but PostgreSQL is the default when both features are enabled"
226                    ));
227                }
228            }
229            #[cfg(not(feature = "mysql"))]
230            {
231                return Err(anyhow::anyhow!(
232                    "MySQL support not enabled. Rebuild with --features mysql"
233                ));
234            }
235        } else {
236            Err(anyhow::anyhow!("Unsupported database URL format"))
237        }
238    }
239
240    /// Create API routes with authentication
241    fn create_api_routes_static(
242        queue: Arc<QueueType>,
243        auth_state: AuthState,
244    ) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
245        // Health check endpoint (no auth required)
246        let health = warp::path("health")
247            .and(warp::path::end())
248            .and(warp::get())
249            .map(|| {
250                warp::reply::json(&serde_json::json!({
251                    "status": "healthy",
252                    "timestamp": chrono::Utc::now().to_rfc3339(),
253                    "version": env!("CARGO_PKG_VERSION")
254                }))
255            });
256
257        // API routes (require authentication)
258        let api_routes = api::queues::routes(queue.clone())
259            .or(api::jobs::routes(queue.clone()))
260            .or(api::stats::routes(queue.clone()))
261            .or(api::system::routes(queue.clone()))
262            .or(api::archive::archive_routes(queue.clone()))
263            .or(api::spawn::spawn_routes(queue));
264
265        let authenticated_api = warp::path("api")
266            .and(auth_filter(auth_state))
267            .untuple_one()
268            .and(api_routes);
269
270        health.or(authenticated_api)
271    }
272
273    /// Create WebSocket routes with authentication
274    fn create_websocket_routes_static(
275        websocket_state: Arc<RwLock<WebSocketState>>,
276        auth_state: AuthState,
277    ) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
278        warp::path("ws")
279            .and(warp::path::end())
280            .and(auth_filter(auth_state))
281            .and(warp::ws())
282            .and(warp::any().map(move || websocket_state.clone()))
283            .map(
284                |_: (), ws: warp::ws::Ws, websocket_state: Arc<RwLock<WebSocketState>>| {
285                    ws.on_upgrade(move |socket| async move {
286                        let mut state = websocket_state.write().await;
287                        if let Err(e) = state.handle_connection(socket).await {
288                            error!("WebSocket error: {}", e);
289                        }
290                    })
291                },
292            )
293    }
294
295    /// Create static file serving routes
296    fn create_static_routes_static(
297        static_dir: std::path::PathBuf,
298    ) -> Result<impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone> {
299        // Serve static files
300        let static_files = warp::path("static").and(warp::fs::dir(static_dir.clone()));
301
302        // Serve index.html at root
303        let index = warp::path::end().and(warp::fs::file(static_dir.join("index.html")));
304
305        // Catch-all for SPA routing - serve index.html
306        let spa_routes = warp::any().and(warp::fs::file(static_dir.join("index.html")));
307
308        Ok(index.or(static_files).or(spa_routes))
309    }
310}
311
312#[cfg(all(feature = "postgres", not(feature = "mysql")))]
313type QueueType = JobQueue<sqlx::Postgres>;
314
315#[cfg(all(feature = "mysql", not(feature = "postgres")))]
316type QueueType = JobQueue<sqlx::MySql>;
317
318#[cfg(all(feature = "postgres", feature = "mysql"))]
319type QueueType = JobQueue<sqlx::Postgres>; // Default to PostgreSQL when both are enabled
320
321#[cfg(all(not(feature = "postgres"), not(feature = "mysql")))]
322compile_error!("At least one database feature (postgres or mysql) must be enabled");
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use crate::config::DashboardConfig;
328    use tempfile::tempdir;
329
330    #[tokio::test]
331    async fn test_dashboard_creation() {
332        let temp_dir = tempdir().unwrap();
333        let config = DashboardConfig::new().with_static_dir(temp_dir.path().to_path_buf());
334
335        let dashboard = WebDashboard::new(config).await;
336        assert!(dashboard.is_ok());
337    }
338
339    #[test]
340    fn test_cors_configuration() {
341        let config = DashboardConfig::new().with_cors(true);
342        assert!(config.enable_cors);
343    }
344}