1use crate::{
47 Result, api,
48 api::system::SystemState,
49 auth::{AuthState, auth_filter, handle_auth_rejection},
50 config::DashboardConfig,
51 websocket::WebSocketState,
52};
53use hammerwork::JobQueue;
54use std::{net::SocketAddr, sync::Arc};
55use tokio::sync::RwLock;
56use tracing::{error, info};
57use warp::{Filter, Reply};
58
59pub struct WebDashboard {
86 config: DashboardConfig,
87 auth_state: AuthState,
88 websocket_state: Arc<RwLock<WebSocketState>>,
89}
90
91impl WebDashboard {
92 pub async fn new(config: DashboardConfig) -> Result<Self> {
117 let auth_state = AuthState::new(config.auth.clone());
118 let websocket_state = Arc::new(RwLock::new(WebSocketState::new(config.websocket.clone())));
119
120 Ok(Self {
121 config,
122 auth_state,
123 websocket_state,
124 })
125 }
126
127 pub async fn start(self) -> Result<()> {
129 let bind_addr: SocketAddr = self.config.bind_addr().parse()?;
130
131 let (queue, database_type) = self.create_job_queue_with_type().await?;
133 let queue = Arc::new(queue);
134
135 let system_state = Arc::new(RwLock::new(SystemState::new(
137 self.config.clone(),
138 database_type,
139 self.config.pool_size,
140 )));
141
142 let api_routes = Self::create_api_routes_static(
144 queue.clone(),
145 self.auth_state.clone(),
146 system_state.clone(),
147 );
148
149 let websocket_routes = Self::create_websocket_routes_static(
151 self.websocket_state.clone(),
152 self.auth_state.clone(),
153 );
154
155 let static_routes = Self::create_static_routes_static(self.config.static_dir.clone())?;
157
158 let routes = api_routes
160 .or(websocket_routes)
161 .or(static_routes)
162 .recover(handle_auth_rejection);
163
164 let routes = routes.with(if self.config.enable_cors {
166 warp::cors()
167 .allow_any_origin()
168 .allow_headers(vec!["content-type", "authorization"])
169 .allow_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"])
170 } else {
171 warp::cors().allow_origin("none") });
173
174 info!("Starting web server on {}", bind_addr);
175
176 let auth_state_cleanup = self.auth_state.clone();
178 tokio::spawn(async move {
179 let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); loop {
181 interval.tick().await;
182 auth_state_cleanup.cleanup_expired_attempts().await;
183 }
184 });
185
186 let websocket_state_ping = self.websocket_state.clone();
188 let ping_interval = self.config.websocket.ping_interval;
189 tokio::spawn(async move {
190 let mut interval = tokio::time::interval(ping_interval);
191 loop {
192 interval.tick().await;
193 let state = websocket_state_ping.read().await;
194 state.ping_all_connections().await;
195 }
196 });
197
198 let websocket_state_broadcast = self.websocket_state.clone();
200 WebSocketState::start_broadcast_listener(websocket_state_broadcast).await?;
201
202 warp::serve(routes).run(bind_addr).await;
204
205 Ok(())
206 }
207
208 async fn create_job_queue_with_type(&self) -> Result<(QueueType, String)> {
210 if self.config.database_url.starts_with("postgres") {
212 #[cfg(feature = "postgres")]
213 {
214 let pg_pool = sqlx::PgPool::connect(&self.config.database_url).await?;
215 info!(
216 "Connected to PostgreSQL with {} connections",
217 self.config.pool_size
218 );
219 Ok((JobQueue::new(pg_pool), "PostgreSQL".to_string()))
220 }
221 #[cfg(not(feature = "postgres"))]
222 {
223 return Err(anyhow::anyhow!(
224 "PostgreSQL support not enabled. Rebuild with --features postgres"
225 ));
226 }
227 } else if self.config.database_url.starts_with("mysql") {
228 #[cfg(feature = "mysql")]
229 {
230 #[cfg(all(feature = "mysql", not(feature = "postgres")))]
231 {
232 let mysql_pool = sqlx::MySqlPool::connect(&self.config.database_url).await?;
233 info!(
234 "Connected to MySQL with {} connections",
235 self.config.pool_size
236 );
237 Ok((JobQueue::new(mysql_pool), "MySQL".to_string()))
238 }
239 #[cfg(all(feature = "postgres", feature = "mysql"))]
240 {
241 return Err(anyhow::anyhow!(
242 "MySQL database URL provided but PostgreSQL is the default when both features are enabled"
243 ));
244 }
245 }
246 #[cfg(not(feature = "mysql"))]
247 {
248 return Err(anyhow::anyhow!(
249 "MySQL support not enabled. Rebuild with --features mysql"
250 ));
251 }
252 } else {
253 Err(anyhow::anyhow!("Unsupported database URL format"))
254 }
255 }
256
257 fn create_api_routes_static(
259 queue: Arc<QueueType>,
260 auth_state: AuthState,
261 system_state: Arc<RwLock<SystemState>>,
262 ) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
263 let health = warp::path("health")
265 .and(warp::path::end())
266 .and(warp::get())
267 .map(|| {
268 warp::reply::json(&serde_json::json!({
269 "status": "healthy",
270 "timestamp": chrono::Utc::now().to_rfc3339(),
271 "version": env!("CARGO_PKG_VERSION")
272 }))
273 });
274
275 let api_routes = api::queues::routes(queue.clone())
277 .or(api::jobs::routes(queue.clone()))
278 .or(api::stats::routes(queue.clone(), system_state.clone()))
279 .or(api::system::routes(queue.clone(), system_state))
280 .or(api::archive::archive_routes(queue.clone()))
281 .or(api::spawn::spawn_routes(queue));
282
283 let authenticated_api = warp::path("api")
284 .and(auth_filter(auth_state))
285 .untuple_one()
286 .and(api_routes);
287
288 health.or(authenticated_api)
289 }
290
291 fn create_websocket_routes_static(
293 websocket_state: Arc<RwLock<WebSocketState>>,
294 auth_state: AuthState,
295 ) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
296 warp::path("ws")
297 .and(warp::path::end())
298 .and(auth_filter(auth_state))
299 .and(warp::ws())
300 .and(warp::any().map(move || websocket_state.clone()))
301 .map(
302 |_: (), ws: warp::ws::Ws, websocket_state: Arc<RwLock<WebSocketState>>| {
303 ws.on_upgrade(move |socket| async move {
304 let mut state = websocket_state.write().await;
305 if let Err(e) = state.handle_connection(socket).await {
306 error!("WebSocket error: {}", e);
307 }
308 })
309 },
310 )
311 }
312
313 fn create_static_routes_static(
315 static_dir: std::path::PathBuf,
316 ) -> Result<impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone> {
317 let static_files = warp::path("static").and(warp::fs::dir(static_dir.clone()));
319
320 let index = warp::path::end().and(warp::fs::file(static_dir.join("index.html")));
322
323 let spa_routes = warp::any().and(warp::fs::file(static_dir.join("index.html")));
325
326 Ok(index.or(static_files).or(spa_routes))
327 }
328}
329
330#[cfg(all(feature = "postgres", not(feature = "mysql")))]
331type QueueType = JobQueue<sqlx::Postgres>;
332
333#[cfg(all(feature = "mysql", not(feature = "postgres")))]
334type QueueType = JobQueue<sqlx::MySql>;
335
336#[cfg(all(feature = "postgres", feature = "mysql"))]
337type QueueType = JobQueue<sqlx::Postgres>; #[cfg(all(not(feature = "postgres"), not(feature = "mysql")))]
340compile_error!("At least one database feature (postgres or mysql) must be enabled");
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::config::DashboardConfig;
346 use tempfile::tempdir;
347
348 #[tokio::test]
349 async fn test_dashboard_creation() {
350 let temp_dir = tempdir().unwrap();
351 let config = DashboardConfig::new().with_static_dir(temp_dir.path().to_path_buf());
352
353 let dashboard = WebDashboard::new(config).await;
354 assert!(dashboard.is_ok());
355 }
356
357 #[test]
358 fn test_cors_configuration() {
359 let config = DashboardConfig::new().with_cors(true);
360 assert!(config.enable_cors);
361 }
362}