Skip to main content

hammerwork_web/
server.rs

1//! Web server implementation for the Hammerwork dashboard.
2//!
3//! This module provides the main `WebDashboard` struct for starting and configuring
4//! the web server, including database connections, authentication, and route setup.
5//!
6//! # Examples
7//!
8//! ## Basic Server Setup
9//!
10//! ```rust,no_run
11//! use hammerwork_web::{WebDashboard, DashboardConfig};
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//!     let config = DashboardConfig::new()
16//!         .with_bind_address("127.0.0.1", 8080)
17//!         .with_database_url("postgresql://localhost/hammerwork");
18//!
19//!     let dashboard = WebDashboard::new(config).await?;
20//!     dashboard.start().await?;
21//!
22//!     Ok(())
23//! }
24//! ```
25//!
26//! ## Server with Authentication
27//!
28//! ```rust,no_run
29//! use hammerwork_web::{WebDashboard, DashboardConfig};
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//!     let config = DashboardConfig::new()
34//!         .with_bind_address("0.0.0.0", 9090)
35//!         .with_database_url("postgresql://localhost/hammerwork")
36//!         .with_auth("admin", "$2b$12$hash...")
37//!         .with_cors(true);
38//!
39//!     let dashboard = WebDashboard::new(config).await?;
40//!     dashboard.start().await?;
41//!
42//!     Ok(())
43//! }
44//! ```
45
46use crate::{
47    Result, api,
48    api::system::SystemState,
49    auth::{AuthState, auth_filter, handle_auth_rejection},
50    config::DashboardConfig,
51    websocket::WebSocketState,
52};
53use hammerwork::JobQueue;
54use std::{net::SocketAddr, sync::Arc};
55use tokio::sync::RwLock;
56use tracing::{error, info};
57use warp::{Filter, Reply};
58
59/// Main web dashboard server.
60///
61/// The `WebDashboard` provides a complete web interface for monitoring and managing
62/// Hammerwork job queues. It includes REST API endpoints, WebSocket support for
63/// real-time updates, authentication, and a modern HTML/CSS/JS frontend.
64///
65/// # Examples
66///
67/// ```rust,no_run
68/// use hammerwork_web::{WebDashboard, DashboardConfig};
69/// use std::path::PathBuf;
70///
71/// #[tokio::main]
72/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
73///     let config = DashboardConfig::new()
74///         .with_bind_address("127.0.0.1", 8080)
75///         .with_database_url("postgresql://localhost/hammerwork")
76///         .with_static_dir(PathBuf::from("./assets"))
77///         .with_cors(false);
78///
79///     let dashboard = WebDashboard::new(config).await?;
80///     dashboard.start().await?;
81///
82///     Ok(())
83/// }
84/// ```
85pub struct WebDashboard {
86    config: DashboardConfig,
87    auth_state: AuthState,
88    websocket_state: Arc<RwLock<WebSocketState>>,
89}
90
91impl WebDashboard {
92    /// Create a new web dashboard instance.
93    ///
94    /// This initializes the dashboard with the provided configuration but does not
95    /// start the web server. Call `start()` to begin serving requests.
96    ///
97    /// # Examples
98    ///
99    /// ```rust,no_run
100    /// use hammerwork_web::{WebDashboard, DashboardConfig};
101    ///
102    /// #[tokio::main]
103    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
104    ///     let config = DashboardConfig::new()
105    ///         .with_database_url("postgresql://localhost/hammerwork");
106    ///
107    ///     let dashboard = WebDashboard::new(config).await?;
108    ///     // Dashboard is created but not yet started
109    ///     Ok(())
110    /// }
111    /// ```
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the configuration is invalid or if initialization fails.
116    pub async fn new(config: DashboardConfig) -> Result<Self> {
117        let auth_state = AuthState::new(config.auth.clone());
118        let websocket_state = Arc::new(RwLock::new(WebSocketState::new(config.websocket.clone())));
119
120        Ok(Self {
121            config,
122            auth_state,
123            websocket_state,
124        })
125    }
126
127    /// Start the web server
128    pub async fn start(self) -> Result<()> {
129        let bind_addr: SocketAddr = self.config.bind_addr().parse()?;
130
131        // Detect database type and create job queue
132        let (queue, database_type) = self.create_job_queue_with_type().await?;
133        let queue = Arc::new(queue);
134
135        // Create system state
136        let system_state = Arc::new(RwLock::new(SystemState::new(
137            self.config.clone(),
138            database_type,
139            self.config.pool_size,
140        )));
141
142        // Create API routes
143        let api_routes = Self::create_api_routes_static(
144            queue.clone(),
145            self.auth_state.clone(),
146            system_state.clone(),
147        );
148
149        // Create WebSocket routes
150        let websocket_routes = Self::create_websocket_routes_static(
151            self.websocket_state.clone(),
152            self.auth_state.clone(),
153        );
154
155        // Create static file routes
156        let static_routes = Self::create_static_routes_static(self.config.static_dir.clone())?;
157
158        // Combine all routes
159        let routes = api_routes
160            .or(websocket_routes)
161            .or(static_routes)
162            .recover(handle_auth_rejection);
163
164        // Apply CORS if enabled (simplified approach)
165        let routes = routes.with(if self.config.enable_cors {
166            warp::cors()
167                .allow_any_origin()
168                .allow_headers(vec!["content-type", "authorization"])
169                .allow_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"])
170        } else {
171            warp::cors().allow_origin("none") // Effectively disable CORS
172        });
173
174        info!("Starting web server on {}", bind_addr);
175
176        // Start cleanup task for auth state
177        let auth_state_cleanup = self.auth_state.clone();
178        tokio::spawn(async move {
179            let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); // 5 minutes
180            loop {
181                interval.tick().await;
182                auth_state_cleanup.cleanup_expired_attempts().await;
183            }
184        });
185
186        // Start WebSocket ping task
187        let websocket_state_ping = self.websocket_state.clone();
188        let ping_interval = self.config.websocket.ping_interval;
189        tokio::spawn(async move {
190            let mut interval = tokio::time::interval(ping_interval);
191            loop {
192                interval.tick().await;
193                let state = websocket_state_ping.read().await;
194                state.ping_all_connections().await;
195            }
196        });
197
198        // Start WebSocket broadcast listener
199        let websocket_state_broadcast = self.websocket_state.clone();
200        WebSocketState::start_broadcast_listener(websocket_state_broadcast).await?;
201
202        // Start the server
203        warp::serve(routes).run(bind_addr).await;
204
205        Ok(())
206    }
207
208    /// Create job queue from database URL and return database type
209    async fn create_job_queue_with_type(&self) -> Result<(QueueType, String)> {
210        // Determine database type from URL and create appropriate queue
211        if self.config.database_url.starts_with("postgres") {
212            #[cfg(feature = "postgres")]
213            {
214                let pg_pool = sqlx::PgPool::connect(&self.config.database_url).await?;
215                info!(
216                    "Connected to PostgreSQL with {} connections",
217                    self.config.pool_size
218                );
219                Ok((JobQueue::new(pg_pool), "PostgreSQL".to_string()))
220            }
221            #[cfg(not(feature = "postgres"))]
222            {
223                return Err(anyhow::anyhow!(
224                    "PostgreSQL support not enabled. Rebuild with --features postgres"
225                ));
226            }
227        } else if self.config.database_url.starts_with("mysql") {
228            #[cfg(feature = "mysql")]
229            {
230                #[cfg(all(feature = "mysql", not(feature = "postgres")))]
231                {
232                    let mysql_pool = sqlx::MySqlPool::connect(&self.config.database_url).await?;
233                    info!(
234                        "Connected to MySQL with {} connections",
235                        self.config.pool_size
236                    );
237                    Ok((JobQueue::new(mysql_pool), "MySQL".to_string()))
238                }
239                #[cfg(all(feature = "postgres", feature = "mysql"))]
240                {
241                    return Err(anyhow::anyhow!(
242                        "MySQL database URL provided but PostgreSQL is the default when both features are enabled"
243                    ));
244                }
245            }
246            #[cfg(not(feature = "mysql"))]
247            {
248                return Err(anyhow::anyhow!(
249                    "MySQL support not enabled. Rebuild with --features mysql"
250                ));
251            }
252        } else {
253            Err(anyhow::anyhow!("Unsupported database URL format"))
254        }
255    }
256
257    /// Create API routes with authentication
258    fn create_api_routes_static(
259        queue: Arc<QueueType>,
260        auth_state: AuthState,
261        system_state: Arc<RwLock<SystemState>>,
262    ) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
263        // Health check endpoint (no auth required)
264        let health = warp::path("health")
265            .and(warp::path::end())
266            .and(warp::get())
267            .map(|| {
268                warp::reply::json(&serde_json::json!({
269                    "status": "healthy",
270                    "timestamp": chrono::Utc::now().to_rfc3339(),
271                    "version": env!("CARGO_PKG_VERSION")
272                }))
273            });
274
275        // API routes (require authentication)
276        let api_routes = api::queues::routes(queue.clone())
277            .or(api::jobs::routes(queue.clone()))
278            .or(api::stats::routes(queue.clone(), system_state.clone()))
279            .or(api::system::routes(queue.clone(), system_state))
280            .or(api::archive::archive_routes(queue.clone()))
281            .or(api::spawn::spawn_routes(queue));
282
283        let authenticated_api = warp::path("api")
284            .and(auth_filter(auth_state))
285            .untuple_one()
286            .and(api_routes);
287
288        health.or(authenticated_api)
289    }
290
291    /// Create WebSocket routes with authentication
292    fn create_websocket_routes_static(
293        websocket_state: Arc<RwLock<WebSocketState>>,
294        auth_state: AuthState,
295    ) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
296        warp::path("ws")
297            .and(warp::path::end())
298            .and(auth_filter(auth_state))
299            .and(warp::ws())
300            .and(warp::any().map(move || websocket_state.clone()))
301            .map(
302                |_: (), ws: warp::ws::Ws, websocket_state: Arc<RwLock<WebSocketState>>| {
303                    ws.on_upgrade(move |socket| async move {
304                        let mut state = websocket_state.write().await;
305                        if let Err(e) = state.handle_connection(socket).await {
306                            error!("WebSocket error: {}", e);
307                        }
308                    })
309                },
310            )
311    }
312
313    /// Create static file serving routes
314    fn create_static_routes_static(
315        static_dir: std::path::PathBuf,
316    ) -> Result<impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone> {
317        // Serve static files
318        let static_files = warp::path("static").and(warp::fs::dir(static_dir.clone()));
319
320        // Serve index.html at root
321        let index = warp::path::end().and(warp::fs::file(static_dir.join("index.html")));
322
323        // Catch-all for SPA routing - serve index.html
324        let spa_routes = warp::any().and(warp::fs::file(static_dir.join("index.html")));
325
326        Ok(index.or(static_files).or(spa_routes))
327    }
328}
329
330#[cfg(all(feature = "postgres", not(feature = "mysql")))]
331type QueueType = JobQueue<sqlx::Postgres>;
332
333#[cfg(all(feature = "mysql", not(feature = "postgres")))]
334type QueueType = JobQueue<sqlx::MySql>;
335
336#[cfg(all(feature = "postgres", feature = "mysql"))]
337type QueueType = JobQueue<sqlx::Postgres>; // Default to PostgreSQL when both are enabled
338
339#[cfg(all(not(feature = "postgres"), not(feature = "mysql")))]
340compile_error!("At least one database feature (postgres or mysql) must be enabled");
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::config::DashboardConfig;
346    use tempfile::tempdir;
347
348    #[tokio::test]
349    async fn test_dashboard_creation() {
350        let temp_dir = tempdir().unwrap();
351        let config = DashboardConfig::new().with_static_dir(temp_dir.path().to_path_buf());
352
353        let dashboard = WebDashboard::new(config).await;
354        assert!(dashboard.is_ok());
355    }
356
357    #[test]
358    fn test_cors_configuration() {
359        let config = DashboardConfig::new().with_cors(true);
360        assert!(config.enable_cors);
361    }
362}