liquid_cache_server/admin_server/
mod.rs

1//! Admin server for the liquid cache server
2//!
3//! This server is used to manage the liquid cache server
4
5use axum::http::{HeaderValue, Method};
6use axum::{
7    Router,
8    routing::{get, post},
9};
10use flamegraph::FlameGraph;
11use std::sync::atomic::AtomicU32;
12use std::{net::SocketAddr, sync::Arc};
13use tower_http::cors::CorsLayer;
14
15mod disk_monitor;
16mod flamegraph;
17mod handlers;
18pub(crate) mod models;
19
20use crate::LiquidCacheService;
21use crate::admin_server::disk_monitor::DiskMonitor;
22
23pub(crate) struct AppState {
24    liquid_cache: Arc<LiquidCacheService>,
25    trace_id: AtomicU32,
26    stats_id: AtomicU32,
27    flamegraph: Arc<FlameGraph>,
28    disk_monitor: Arc<DiskMonitor>,
29}
30
31/// Run the admin server
32pub async fn run_admin_server(
33    addr: SocketAddr,
34    liquid_cache: Arc<LiquidCacheService>,
35) -> Result<(), Box<dyn std::error::Error>> {
36    let state = Arc::new(AppState {
37        liquid_cache,
38        trace_id: AtomicU32::new(0),
39        stats_id: AtomicU32::new(0),
40        flamegraph: Arc::new(FlameGraph::new()),
41        disk_monitor: Arc::new(DiskMonitor::new()),
42    });
43
44    // Create a CORS layer that allows all localhost origins
45    let cors = CorsLayer::new()
46        // Allow all localhost origins (http and https)
47        .allow_origin([
48            "http://localhost:3000".parse::<HeaderValue>().unwrap(),
49            "http://127.0.0.1:3000".parse::<HeaderValue>().unwrap(),
50            "https://liquid-cache-admin.xiangpeng.systems"
51                .parse::<HeaderValue>()
52                .unwrap(),
53        ])
54        .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE])
55        .allow_headers([axum::http::header::CONTENT_TYPE]);
56
57    let app = Router::new()
58        .route("/shutdown", get(handlers::shutdown_handler))
59        .route("/reset_cache", get(handlers::reset_cache_handler))
60        .route(
61            "/parquet_cache_usage",
62            get(handlers::get_parquet_cache_usage_handler),
63        )
64        .route("/cache_info", get(handlers::get_cache_info_handler))
65        .route("/system_info", get(handlers::get_system_info_handler))
66        .route("/start_trace", get(handlers::start_trace_handler))
67        .route("/stop_trace", get(handlers::stop_trace_handler))
68        .route(
69            "/execution_metrics",
70            get(handlers::get_execution_metrics_handler),
71        )
72        .route("/execution_plans", get(handlers::get_execution_stats))
73        .route("/cache_stats", get(handlers::get_cache_stats_handler))
74        .route("/start_flamegraph", get(handlers::start_flamegraph_handler))
75        .route("/stop_flamegraph", get(handlers::stop_flamegraph_handler))
76        .route(
77            "/set_execution_stats",
78            post(handlers::add_execution_stats_handler),
79        )
80        .route(
81            "/start_disk_usage_monitor",
82            get(handlers::start_disk_usage_monitor_handler),
83        )
84        .route(
85            "/stop_disk_usage_monitor",
86            get(handlers::stop_disk_usage_monitor_handler),
87        )
88        .with_state(state)
89        .layer(cors);
90
91    let listener = tokio::net::TcpListener::bind(addr).await?;
92    axum::serve(listener, app).await?;
93
94    Ok(())
95}