1use crate::{
47 Result, api,
48 auth::{AuthState, auth_filter, handle_auth_rejection},
49 config::DashboardConfig,
50 websocket::WebSocketState,
51};
52use hammerwork::JobQueue;
53use std::{net::SocketAddr, sync::Arc};
54use tokio::sync::RwLock;
55use tracing::{error, info};
56use warp::{Filter, Reply};
57
58pub struct WebDashboard {
85 config: DashboardConfig,
86 auth_state: AuthState,
87 websocket_state: Arc<RwLock<WebSocketState>>,
88}
89
90impl WebDashboard {
91 pub async fn new(config: DashboardConfig) -> Result<Self> {
116 let auth_state = AuthState::new(config.auth.clone());
117 let websocket_state = Arc::new(RwLock::new(WebSocketState::new(config.websocket.clone())));
118
119 Ok(Self {
120 config,
121 auth_state,
122 websocket_state,
123 })
124 }
125
126 pub async fn start(self) -> Result<()> {
128 let bind_addr: SocketAddr = self.config.bind_addr().parse()?;
129
130 let queue = Arc::new(self.create_job_queue().await?);
132
133 let api_routes = Self::create_api_routes_static(queue.clone(), self.auth_state.clone());
135
136 let websocket_routes = Self::create_websocket_routes_static(
138 self.websocket_state.clone(),
139 self.auth_state.clone(),
140 );
141
142 let static_routes = Self::create_static_routes_static(self.config.static_dir.clone())?;
144
145 let routes = api_routes
147 .or(websocket_routes)
148 .or(static_routes)
149 .recover(handle_auth_rejection);
150
151 let routes = routes.with(if self.config.enable_cors {
153 warp::cors()
154 .allow_any_origin()
155 .allow_headers(vec!["content-type", "authorization"])
156 .allow_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"])
157 } else {
158 warp::cors().allow_origin("none") });
160
161 info!("Starting web server on {}", bind_addr);
162
163 let auth_state_cleanup = self.auth_state.clone();
165 tokio::spawn(async move {
166 let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); loop {
168 interval.tick().await;
169 auth_state_cleanup.cleanup_expired_attempts().await;
170 }
171 });
172
173 let websocket_state_ping = self.websocket_state.clone();
175 let ping_interval = self.config.websocket.ping_interval;
176 tokio::spawn(async move {
177 let mut interval = tokio::time::interval(ping_interval);
178 loop {
179 interval.tick().await;
180 let state = websocket_state_ping.read().await;
181 state.ping_all_connections().await;
182 }
183 });
184
185 warp::serve(routes).run(bind_addr).await;
187
188 Ok(())
189 }
190
191 async fn create_job_queue(&self) -> Result<QueueType> {
193 if self.config.database_url.starts_with("postgres") {
195 #[cfg(feature = "postgres")]
196 {
197 let pg_pool = sqlx::PgPool::connect(&self.config.database_url).await?;
198 info!(
199 "Connected to PostgreSQL with {} connections",
200 self.config.pool_size
201 );
202 Ok(JobQueue::new(pg_pool))
203 }
204 #[cfg(not(feature = "postgres"))]
205 {
206 return Err(anyhow::anyhow!(
207 "PostgreSQL support not enabled. Rebuild with --features postgres"
208 ));
209 }
210 } else if self.config.database_url.starts_with("mysql") {
211 #[cfg(feature = "mysql")]
212 {
213 #[cfg(all(feature = "mysql", not(feature = "postgres")))]
214 {
215 let mysql_pool = sqlx::MySqlPool::connect(&self.config.database_url).await?;
216 info!(
217 "Connected to MySQL with {} connections",
218 self.config.pool_size
219 );
220 Ok(JobQueue::new(mysql_pool))
221 }
222 #[cfg(all(feature = "postgres", feature = "mysql"))]
223 {
224 return Err(anyhow::anyhow!(
225 "MySQL database URL provided but PostgreSQL is the default when both features are enabled"
226 ));
227 }
228 }
229 #[cfg(not(feature = "mysql"))]
230 {
231 return Err(anyhow::anyhow!(
232 "MySQL support not enabled. Rebuild with --features mysql"
233 ));
234 }
235 } else {
236 Err(anyhow::anyhow!("Unsupported database URL format"))
237 }
238 }
239
240 fn create_api_routes_static(
242 queue: Arc<QueueType>,
243 auth_state: AuthState,
244 ) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
245 let health = warp::path("health")
247 .and(warp::path::end())
248 .and(warp::get())
249 .map(|| {
250 warp::reply::json(&serde_json::json!({
251 "status": "healthy",
252 "timestamp": chrono::Utc::now().to_rfc3339(),
253 "version": env!("CARGO_PKG_VERSION")
254 }))
255 });
256
257 let api_routes = api::queues::routes(queue.clone())
259 .or(api::jobs::routes(queue.clone()))
260 .or(api::stats::routes(queue.clone()))
261 .or(api::system::routes(queue.clone()))
262 .or(api::archive::archive_routes(queue.clone()))
263 .or(api::spawn::spawn_routes(queue));
264
265 let authenticated_api = warp::path("api")
266 .and(auth_filter(auth_state))
267 .untuple_one()
268 .and(api_routes);
269
270 health.or(authenticated_api)
271 }
272
273 fn create_websocket_routes_static(
275 websocket_state: Arc<RwLock<WebSocketState>>,
276 auth_state: AuthState,
277 ) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
278 warp::path("ws")
279 .and(warp::path::end())
280 .and(auth_filter(auth_state))
281 .and(warp::ws())
282 .and(warp::any().map(move || websocket_state.clone()))
283 .map(
284 |_: (), ws: warp::ws::Ws, websocket_state: Arc<RwLock<WebSocketState>>| {
285 ws.on_upgrade(move |socket| async move {
286 let mut state = websocket_state.write().await;
287 if let Err(e) = state.handle_connection(socket).await {
288 error!("WebSocket error: {}", e);
289 }
290 })
291 },
292 )
293 }
294
295 fn create_static_routes_static(
297 static_dir: std::path::PathBuf,
298 ) -> Result<impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone> {
299 let static_files = warp::path("static").and(warp::fs::dir(static_dir.clone()));
301
302 let index = warp::path::end().and(warp::fs::file(static_dir.join("index.html")));
304
305 let spa_routes = warp::any().and(warp::fs::file(static_dir.join("index.html")));
307
308 Ok(index.or(static_files).or(spa_routes))
309 }
310}
311
312#[cfg(all(feature = "postgres", not(feature = "mysql")))]
313type QueueType = JobQueue<sqlx::Postgres>;
314
315#[cfg(all(feature = "mysql", not(feature = "postgres")))]
316type QueueType = JobQueue<sqlx::MySql>;
317
318#[cfg(all(feature = "postgres", feature = "mysql"))]
319type QueueType = JobQueue<sqlx::Postgres>; #[cfg(all(not(feature = "postgres"), not(feature = "mysql")))]
322compile_error!("At least one database feature (postgres or mysql) must be enabled");
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use crate::config::DashboardConfig;
328 use tempfile::tempdir;
329
330 #[tokio::test]
331 async fn test_dashboard_creation() {
332 let temp_dir = tempdir().unwrap();
333 let config = DashboardConfig::new().with_static_dir(temp_dir.path().to_path_buf());
334
335 let dashboard = WebDashboard::new(config).await;
336 assert!(dashboard.is_ok());
337 }
338
339 #[test]
340 fn test_cors_configuration() {
341 let config = DashboardConfig::new().with_cors(true);
342 assert!(config.enable_cors);
343 }
344}