1use super::ApiResponse;
124use hammerwork::queue::DatabaseQueue;
125use serde::{Deserialize, Serialize};
126use std::sync::Arc;
127use tokio::sync::RwLock;
128use warp::{Filter, Reply};
129
130#[derive(Clone)]
132pub struct SystemState {
133 pub started_at: chrono::DateTime<chrono::Utc>,
135 pub config: crate::DashboardConfig,
137 pub database_type: String,
139 pub pool_size: u32,
141}
142
143impl SystemState {
144 pub fn new(config: crate::DashboardConfig, database_type: String, pool_size: u32) -> Self {
145 Self {
146 started_at: chrono::Utc::now(),
147 config,
148 database_type,
149 pool_size,
150 }
151 }
152
153 pub fn uptime_seconds(&self) -> i64 {
154 (chrono::Utc::now() - self.started_at).num_seconds()
155 }
156}
157
158#[derive(Debug, Serialize)]
160pub struct SystemInfo {
161 pub version: String,
162 pub build_info: BuildInfo,
163 pub runtime_info: RuntimeInfo,
164 pub database_info: DatabaseInfo,
165 pub features: Vec<String>,
166 pub uptime_seconds: u64,
167 pub started_at: chrono::DateTime<chrono::Utc>,
168}
169
170#[derive(Debug, Serialize)]
172pub struct BuildInfo {
173 pub version: String,
174 pub git_commit: Option<String>,
175 pub build_date: Option<String>,
176 pub rust_version: String,
177 pub target_triple: String,
178}
179
180#[derive(Debug, Serialize)]
182pub struct RuntimeInfo {
183 pub process_id: u32,
184 pub memory_usage_bytes: Option<u64>,
185 pub cpu_usage_percent: Option<f64>,
186 pub thread_count: Option<usize>,
187 pub gc_collections: Option<u64>,
188}
189
190#[derive(Debug, Serialize)]
192pub struct DatabaseInfo {
193 pub database_type: String,
194 pub connection_url: String, pub pool_size: u32,
196 pub active_connections: Option<u32>,
197 pub connection_health: bool,
198 pub last_migration: Option<String>,
199}
200
201#[derive(Debug, Serialize)]
203pub struct ServerConfig {
204 pub bind_address: String,
205 pub port: u16,
206 pub authentication_enabled: bool,
207 pub cors_enabled: bool,
208 pub websocket_max_connections: usize,
209 pub static_assets_path: String,
210}
211
212#[derive(Debug, Serialize)]
214pub struct MetricsInfo {
215 pub prometheus_enabled: bool,
216 pub metrics_endpoint: String,
217 pub custom_metrics_count: u32,
218 pub last_scrape: Option<chrono::DateTime<chrono::Utc>>,
219}
220
221#[derive(Debug, Deserialize)]
223pub struct ConfigUpdateRequest {
224 pub setting: String,
225 pub value: serde_json::Value,
226}
227
228#[derive(Debug, Deserialize)]
230pub struct MaintenanceRequest {
231 pub operation: String, pub target: Option<String>, pub dry_run: Option<bool>,
234}
235
236pub fn routes<T>(
238 queue: Arc<T>,
239 system_state: Arc<RwLock<SystemState>>,
240) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
241where
242 T: DatabaseQueue + Send + Sync + 'static,
243{
244 let queue_filter = warp::any().map(move || queue.clone());
245 let state_filter = warp::any().map(move || system_state.clone());
246
247 let info = warp::path("system")
248 .and(warp::path("info"))
249 .and(warp::path::end())
250 .and(warp::get())
251 .and(queue_filter.clone())
252 .and(state_filter.clone())
253 .and_then(system_info_handler);
254
255 let config = warp::path("system")
256 .and(warp::path("config"))
257 .and(warp::path::end())
258 .and(warp::get())
259 .and(state_filter.clone())
260 .and_then(system_config_handler);
261
262 let metrics_info = warp::path("system")
263 .and(warp::path("metrics"))
264 .and(warp::path::end())
265 .and(warp::get())
266 .and(state_filter.clone())
267 .and_then(metrics_info_handler);
268
269 let maintenance = warp::path("system")
270 .and(warp::path("maintenance"))
271 .and(warp::path::end())
272 .and(warp::post())
273 .and(queue_filter.clone())
274 .and(warp::body::json())
275 .and_then(maintenance_handler);
276
277 let version = warp::path("version")
278 .and(warp::path::end())
279 .and(warp::get())
280 .and_then(version_handler);
281
282 info.or(config).or(metrics_info).or(maintenance).or(version)
283}
284
285async fn system_info_handler<T>(
287 queue: Arc<T>,
288 system_state: Arc<RwLock<SystemState>>,
289) -> Result<impl Reply, warp::Rejection>
290where
291 T: DatabaseQueue + Send + Sync,
292{
293 let database_healthy = queue.get_all_queue_stats().await.is_ok();
295
296 let build_info = BuildInfo {
297 version: env!("CARGO_PKG_VERSION").to_string(),
298 git_commit: option_env!("GIT_COMMIT").map(|s| s.to_string()),
299 build_date: option_env!("BUILD_DATE").map(|s| s.to_string()),
300 rust_version: get_rust_version(),
301 target_triple: get_target_triple(),
302 };
303
304 let runtime_info = RuntimeInfo {
305 process_id: std::process::id(),
306 memory_usage_bytes: get_memory_usage(),
307 cpu_usage_percent: None, thread_count: None, gc_collections: None, };
311
312 let state = system_state.read().await;
313
314 let database_info = DatabaseInfo {
315 database_type: state.database_type.clone(),
316 connection_url: "***masked***".to_string(),
317 pool_size: state.pool_size,
318 active_connections: None, connection_health: database_healthy,
320 last_migration: None, };
322
323 let features = vec![
324 #[cfg(feature = "postgres")]
325 "postgres".to_string(),
326 #[cfg(feature = "mysql")]
327 "mysql".to_string(),
328 #[cfg(feature = "auth")]
329 "auth".to_string(),
330 ];
331
332 let system_info = SystemInfo {
333 version: env!("CARGO_PKG_VERSION").to_string(),
334 build_info,
335 runtime_info,
336 database_info,
337 features,
338 uptime_seconds: state.uptime_seconds() as u64,
339 started_at: state.started_at,
340 };
341
342 Ok(warp::reply::json(&ApiResponse::success(system_info)))
343}
344
345async fn system_config_handler(
347 system_state: Arc<RwLock<SystemState>>,
348) -> Result<impl Reply, warp::Rejection> {
349 let state = system_state.read().await;
350 let config = ServerConfig {
351 bind_address: state.config.bind_address.clone(),
352 port: state.config.port,
353 authentication_enabled: state.config.auth.enabled,
354 cors_enabled: state.config.enable_cors,
355 websocket_max_connections: state.config.websocket.max_connections,
356 static_assets_path: state.config.static_dir.to_string_lossy().to_string(),
357 };
358
359 Ok(warp::reply::json(&ApiResponse::success(config)))
360}
361
362async fn metrics_info_handler(
364 system_state: Arc<RwLock<SystemState>>,
365) -> Result<impl Reply, warp::Rejection> {
366 let state = system_state.read().await;
367
368 let metrics_info = MetricsInfo {
369 prometheus_enabled: cfg!(feature = "metrics"),
370 metrics_endpoint: "/metrics".to_string(),
371 custom_metrics_count: get_custom_metrics_count(),
372 last_scrape: get_last_scrape_time().await,
373 };
374
375 Ok(warp::reply::json(&ApiResponse::success(metrics_info)))
376}
377
378async fn maintenance_handler<T>(
380 queue: Arc<T>,
381 request: MaintenanceRequest,
382) -> Result<impl Reply, warp::Rejection>
383where
384 T: DatabaseQueue + Send + Sync,
385{
386 let dry_run = request.dry_run.unwrap_or(false);
387
388 match request.operation.as_str() {
389 "cleanup" => {
390 if dry_run {
391 let response = ApiResponse::success(serde_json::json!({
392 "operation": "cleanup",
393 "dry_run": true,
394 "message": "Dry run: Would clean up old completed and dead jobs",
395 "estimated_deletions": 0
396 }));
397 Ok(warp::reply::json(&response))
398 } else {
399 let older_than = chrono::Utc::now() - chrono::Duration::days(7); match queue.purge_dead_jobs(older_than).await {
402 Ok(count) => {
403 let response = ApiResponse::success(serde_json::json!({
404 "operation": "cleanup",
405 "dry_run": false,
406 "message": format!("Cleaned up {} dead jobs", count),
407 "deletions": count
408 }));
409 Ok(warp::reply::json(&response))
410 }
411 Err(e) => {
412 let response = ApiResponse::<()>::error(format!("Cleanup failed: {}", e));
413 Ok(warp::reply::json(&response))
414 }
415 }
416 }
417 }
418 "vacuum" => {
419 let response =
421 ApiResponse::<()>::error("Vacuum operation not yet implemented".to_string());
422 Ok(warp::reply::json(&response))
423 }
424 "reindex" => {
425 let response =
427 ApiResponse::<()>::error("Reindex operation not yet implemented".to_string());
428 Ok(warp::reply::json(&response))
429 }
430 "optimize" => {
431 let response =
433 ApiResponse::<()>::error("Optimize operation not yet implemented".to_string());
434 Ok(warp::reply::json(&response))
435 }
436 _ => {
437 let response = ApiResponse::<()>::error(format!(
438 "Unknown maintenance operation: {}",
439 request.operation
440 ));
441 Ok(warp::reply::json(&response))
442 }
443 }
444}
445
446async fn version_handler() -> Result<impl Reply, warp::Rejection> {
448 let version_info = serde_json::json!({
449 "version": env!("CARGO_PKG_VERSION"),
450 "name": env!("CARGO_PKG_NAME"),
451 "description": env!("CARGO_PKG_DESCRIPTION"),
452 "authors": env!("CARGO_PKG_AUTHORS").split(':').collect::<Vec<_>>(),
453 "repository": env!("CARGO_PKG_REPOSITORY"),
454 "license": env!("CARGO_PKG_LICENSE"),
455 "rust_version": get_rust_version(),
456 "build_target": get_target_triple(),
457 });
458
459 Ok(warp::reply::json(&ApiResponse::success(version_info)))
460}
461
462fn get_rust_version() -> String {
464 option_env!("RUSTC_VERSION")
465 .unwrap_or("unknown")
466 .to_string()
467}
468
469fn get_target_triple() -> String {
471 std::env::consts::ARCH.to_string() + "-" + std::env::consts::OS
472}
473
474fn get_memory_usage() -> Option<u64> {
476 #[cfg(target_os = "linux")]
477 {
478 use std::fs;
479 if let Ok(contents) = fs::read_to_string("/proc/self/status") {
480 for line in contents.lines() {
481 if line.starts_with("VmRSS:") {
482 if let Some(kb) = line
483 .split_whitespace()
484 .nth(1)
485 .and_then(|s| s.parse::<u64>().ok())
486 {
487 return Some(kb * 1024); }
489 }
490 }
491 }
492 }
493
494 #[cfg(target_os = "macos")]
495 {
496 }
499
500 #[cfg(target_os = "windows")]
501 {
502 }
505
506 None
507}
508
509fn get_custom_metrics_count() -> u32 {
511 #[cfg(feature = "metrics")]
512 {
513 0
518 }
519
520 #[cfg(not(feature = "metrics"))]
521 {
522 0
523 }
524}
525
526async fn get_last_scrape_time() -> Option<chrono::DateTime<chrono::Utc>> {
528 #[cfg(feature = "metrics")]
529 {
530 None
534 }
535
536 #[cfg(not(feature = "metrics"))]
537 {
538 None
539 }
540}
541
542#[cfg(test)]
543mod tests {
544 use super::*;
545
546 #[test]
547 fn test_maintenance_request_deserialization() {
548 let json = r#"{
549 "operation": "cleanup",
550 "target": "completed_jobs",
551 "dry_run": true
552 }"#;
553
554 let request: MaintenanceRequest = serde_json::from_str(json).unwrap();
555 assert_eq!(request.operation, "cleanup");
556 assert_eq!(request.target, Some("completed_jobs".to_string()));
557 assert_eq!(request.dry_run, Some(true));
558 }
559
560 #[test]
561 fn test_build_info_creation() {
562 let build_info = BuildInfo {
563 version: "1.0.0".to_string(),
564 git_commit: Some("abc123".to_string()),
565 build_date: Some("2024-01-01".to_string()),
566 rust_version: "1.70.0".to_string(),
567 target_triple: "x86_64-unknown-linux-gnu".to_string(),
568 };
569
570 let json = serde_json::to_string(&build_info).unwrap();
571 assert!(json.contains("1.0.0"));
572 assert!(json.contains("abc123"));
573 }
574
575 #[test]
576 fn test_get_rust_version() {
577 let version = get_rust_version();
578 assert!(!version.is_empty());
579 }
580
581 #[test]
582 fn test_get_target_triple() {
583 let target = get_target_triple();
584 assert!(!target.is_empty());
585 assert!(target.contains("-"));
586 }
587}