liquid_cache_server/admin_server/
handlers.rs

1use std::{
2    fs,
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use axum::{
8    Json,
9    extract::{Query, State},
10};
11use datafusion::{
12    catalog::memory::DataSourceExec,
13    common::{
14        stats::Precision,
15        tree_node::{TreeNode, TreeNodeRecursion},
16    },
17    datasource::physical_plan::FileScanConfig,
18    physical_plan::ExecutionPlan,
19};
20use liquid_cache_common::rpc::ExecutionMetricsResponse;
21use liquid_cache_parquet::LiquidParquetSource;
22use log::info;
23use serde::Serialize;
24use uuid::Uuid;
25
26use crate::{
27    ColumnStatistics, ExecutionPlanWithStats, ExecutionStatsWithPlan, MetricValues, PlanInfo,
28    SchemaField, Statistics,
29};
30
31use super::{
32    AppState,
33    models::{ApiResponse, ExecutionStats},
34};
35
36pub(crate) async fn shutdown_handler() -> Json<ApiResponse> {
37    info!("Shutdown request received, shutting down server...");
38
39    tokio::spawn(async {
40        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
41        std::process::exit(0);
42    });
43
44    Json(ApiResponse {
45        message: "Server shutting down...".to_string(),
46        status: "success".to_string(),
47    })
48}
49
50pub(crate) async fn reset_cache_handler(State(state): State<Arc<AppState>>) -> Json<ApiResponse> {
51    info!("Resetting cache...");
52    let cache = state.liquid_cache.cache();
53    unsafe {
54        cache.reset();
55    }
56
57    Json(ApiResponse {
58        message: "Cache reset successfully".to_string(),
59        status: "success".to_string(),
60    })
61}
62
63#[derive(Serialize)]
64pub(crate) struct ParquetCacheUsage {
65    directory: String,
66    file_count: usize,
67    total_size_bytes: u64,
68    status: String,
69}
70
71fn get_parquet_cache_usage_inner(cache_dir: &Path) -> ParquetCacheUsage {
72    let mut file_count = 0;
73    let mut total_size: u64 = 0;
74
75    fn walk_dir(dir: &Path) -> Result<(usize, u64), std::io::Error> {
76        let mut count = 0;
77        let mut size = 0;
78
79        if dir.exists() {
80            for entry in fs::read_dir(dir)? {
81                let entry = entry?;
82                let path = entry.path();
83
84                if path.is_file() {
85                    count += 1;
86                    let metadata = fs::metadata(&path)?;
87                    size += metadata.len();
88                } else if path.is_dir() {
89                    let (sub_count, sub_size) = walk_dir(&path)?;
90                    count += sub_count;
91                    size += sub_size;
92                }
93            }
94        }
95
96        Ok((count, size))
97    }
98
99    if let Ok((count, size)) = walk_dir(cache_dir) {
100        file_count = count;
101        total_size = size;
102    }
103
104    ParquetCacheUsage {
105        directory: cache_dir.to_string_lossy().to_string(),
106        file_count,
107        total_size_bytes: total_size,
108        status: "success".to_string(),
109    }
110}
111
112pub(crate) async fn get_parquet_cache_usage_handler(
113    State(state): State<Arc<AppState>>,
114) -> Json<ParquetCacheUsage> {
115    info!("Getting parquet cache usage...");
116    let cache_dir = state.liquid_cache.get_parquet_cache_dir();
117    let usage = get_parquet_cache_usage_inner(cache_dir);
118    Json(usage)
119}
120
121#[derive(Serialize)]
122pub(crate) struct CacheInfo {
123    batch_size: usize,
124    max_cache_bytes: u64,
125    memory_usage_bytes: u64,
126    disk_usage_bytes: u64,
127}
128
129pub(crate) async fn get_cache_info_handler(State(state): State<Arc<AppState>>) -> Json<CacheInfo> {
130    info!("Getting cache info...");
131    let cache = state.liquid_cache.cache();
132    let batch_size = cache.batch_size();
133    let max_cache_bytes = cache.max_cache_bytes() as u64;
134    let memory_usage_bytes = cache.memory_usage_bytes() as u64;
135    let disk_usage_bytes = cache.disk_usage_bytes() as u64;
136    Json(CacheInfo {
137        batch_size,
138        max_cache_bytes,
139        memory_usage_bytes,
140        disk_usage_bytes,
141    })
142}
143
144#[derive(Serialize)]
145pub(crate) struct SystemInfo {
146    total_memory_bytes: u64,
147    used_memory_bytes: u64,
148    available_memory_bytes: u64,
149    name: String,
150    kernel: String,
151    os: String,
152    host_name: String,
153    cpu_cores: usize,
154    server_resident_memory_bytes: u64,
155    server_virtual_memory_bytes: u64,
156}
157
158pub(crate) async fn get_system_info_handler(
159    State(_state): State<Arc<AppState>>,
160) -> Json<SystemInfo> {
161    info!("Getting system info...");
162    let mut sys = sysinfo::System::new_all();
163    sys.refresh_all();
164    let current_pid = sysinfo::get_current_pid().unwrap();
165    let process = sys.process(current_pid).unwrap();
166    let resident_memory = process.memory();
167    let virtual_memory = process.virtual_memory();
168    Json(SystemInfo {
169        total_memory_bytes: sys.total_memory(),
170        used_memory_bytes: sys.used_memory(),
171        available_memory_bytes: sys.available_memory(),
172        name: sysinfo::System::name().unwrap_or_default(),
173        kernel: sysinfo::System::kernel_version().unwrap_or_default(),
174        os: sysinfo::System::os_version().unwrap_or_default(),
175        host_name: sysinfo::System::host_name().unwrap_or_default(),
176        cpu_cores: sysinfo::System::physical_core_count().unwrap_or(0),
177        server_resident_memory_bytes: resident_memory,
178        server_virtual_memory_bytes: virtual_memory,
179    })
180}
181
182#[derive(serde::Deserialize)]
183pub(crate) struct TraceParams {
184    path: String,
185}
186
187#[derive(serde::Deserialize)]
188pub(crate) struct ExecutionMetricsParams {
189    plan_id: String,
190}
191
192#[derive(serde::Deserialize)]
193pub(crate) struct CacheStatsParams {
194    path: String,
195}
196
197pub(crate) async fn start_trace_handler(State(state): State<Arc<AppState>>) -> Json<ApiResponse> {
198    info!("Starting cache trace collection...");
199    let cache = state.liquid_cache.cache();
200    cache.enable_trace();
201
202    Json(ApiResponse {
203        message: "Cache trace collection started".to_string(),
204        status: "success".to_string(),
205    })
206}
207
208pub(crate) async fn stop_trace_handler(
209    Query(params): Query<TraceParams>,
210    State(state): State<Arc<AppState>>,
211) -> Json<ApiResponse> {
212    info!("Stopping cache trace collection...");
213    let save_path = Path::new(&params.path);
214
215    match save_trace_to_file(save_path, &state) {
216        Ok(_) => Json(ApiResponse {
217            message: format!(
218                "Cache trace collection stopped, saved to {}",
219                save_path.display()
220            ),
221            status: "success".to_string(),
222        }),
223        Err(e) => Json(ApiResponse {
224            message: format!("Failed to save trace: {e}"),
225            status: "error".to_string(),
226        }),
227    }
228}
229
230pub(crate) fn save_trace_to_file(
231    save_dir: &Path,
232    state: &AppState,
233) -> Result<(), Box<dyn std::error::Error>> {
234    let now = std::time::SystemTime::now();
235    let datetime = now.duration_since(std::time::UNIX_EPOCH).unwrap();
236    let minute = (datetime.as_secs() / 60) % 60;
237    let second = datetime.as_secs() % 60;
238    let trace_id = state
239        .trace_id
240        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
241    let filename = format!("cache-trace-id{trace_id:02}-{minute:02}-{second:03}.parquet",);
242
243    // Ensure directory exists
244    if !save_dir.exists() {
245        fs::create_dir_all(save_dir)?;
246    }
247
248    let file_path = save_dir.join(filename);
249    let cache = state.liquid_cache.cache();
250    cache.disable_trace();
251    cache.flush_trace(&file_path);
252    Ok(())
253}
254
255pub(crate) async fn get_execution_metrics_handler(
256    State(state): State<Arc<AppState>>,
257    Query(params): Query<ExecutionMetricsParams>,
258) -> Json<Option<ExecutionMetricsResponse>> {
259    let Ok(uuid) = Uuid::parse_str(&params.plan_id) else {
260        return Json(None);
261    };
262    let metrics = state.liquid_cache.inner().get_metrics(&uuid);
263    Json(metrics)
264}
265
266pub(crate) fn get_cache_stats_inner(
267    cache: &liquid_cache_parquet::LiquidCacheRef,
268    save_dir: impl AsRef<Path>,
269    state: &AppState,
270) -> Result<PathBuf, Box<dyn std::error::Error>> {
271    let now = std::time::SystemTime::now();
272    let datetime = now.duration_since(std::time::UNIX_EPOCH).unwrap();
273    let minute = (datetime.as_secs() / 60) % 60;
274    let second = datetime.as_secs() % 60;
275    let trace_id = state
276        .stats_id
277        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
278    let filename = format!("cache-stats-id{trace_id:02}-{minute:02}-{second:03}.parquet",);
279    let file_path = save_dir.as_ref().join(filename);
280    cache.write_stats(&file_path)?;
281    Ok(file_path)
282}
283
284pub(crate) async fn get_cache_stats_handler(
285    State(state): State<Arc<AppState>>,
286    Query(params): Query<CacheStatsParams>,
287) -> Json<ApiResponse> {
288    let cache = state.liquid_cache.cache();
289    match get_cache_stats_inner(cache, &params.path, &state) {
290        Ok(file_path) => {
291            info!("Cache stats saved to {}", file_path.display());
292            Json(ApiResponse {
293                message: format!("Cache stats saved to {}", file_path.display()),
294                status: "success".to_string(),
295            })
296        }
297        Err(e) => Json(ApiResponse {
298            message: format!("Failed to get cache stats: {e}"),
299            status: "error".to_string(),
300        }),
301    }
302}
303
304pub(crate) async fn start_flamegraph_handler(
305    State(state): State<Arc<AppState>>,
306) -> Json<ApiResponse> {
307    info!("Starting flamegraph collection...");
308    state.flamegraph.start();
309    Json(ApiResponse {
310        message: "Flamegraph collection started".to_string(),
311        status: "success".to_string(),
312    })
313}
314
315impl From<&Arc<dyn ExecutionPlan>> for ExecutionPlanWithStats {
316    fn from(plan: &Arc<dyn ExecutionPlan>) -> Self {
317        let metrics = plan.metrics().unwrap().aggregate_by_name();
318        let mut metric_values = Vec::new();
319        for metric in metrics.iter() {
320            metric_values.push(MetricValues {
321                name: metric.value().name().to_string(),
322                value: metric.value().to_string(),
323            });
324        }
325
326        let mut column_statistics = Vec::new();
327        for (i, cs) in plan
328            .partition_statistics(None)
329            .unwrap()
330            .column_statistics
331            .iter()
332            .enumerate()
333        {
334            let min = if cs.min_value != Precision::Absent {
335                Some(cs.min_value.to_string())
336            } else {
337                None
338            };
339            let max = if cs.max_value != Precision::Absent {
340                Some(cs.max_value.to_string())
341            } else {
342                None
343            };
344            let sum = if cs.sum_value != Precision::Absent {
345                Some(cs.sum_value.to_string())
346            } else {
347                None
348            };
349            let distinct = if cs.distinct_count != Precision::Absent {
350                Some(cs.distinct_count.to_string())
351            } else {
352                None
353            };
354            let null = if cs.null_count != Precision::Absent {
355                Some(cs.null_count.to_string())
356            } else {
357                None
358            };
359            column_statistics.push(ColumnStatistics {
360                name: format!("col_{i}"),
361                null,
362                min,
363                max,
364                sum,
365                distinct_count: distinct,
366            });
367        }
368
369        ExecutionPlanWithStats {
370            name: plan.name().to_string(),
371            schema: plan
372                .schema()
373                .fields()
374                .iter()
375                .map(|field| SchemaField {
376                    name: field.name().to_string(),
377                    data_type: field.data_type().to_string(),
378                })
379                .collect(),
380            statistics: Statistics {
381                num_rows: plan
382                    .partition_statistics(None)
383                    .unwrap()
384                    .num_rows
385                    .to_string(),
386                total_byte_size: plan
387                    .partition_statistics(None)
388                    .unwrap()
389                    .total_byte_size
390                    .to_string(),
391                column_statistics,
392            },
393            metrics: metric_values,
394            children: plan
395                .children()
396                .iter()
397                .map(|child| (*child).into())
398                .collect(),
399        }
400    }
401}
402
403fn get_liquid_exec_info(plan: &Arc<dyn ExecutionPlan>) -> Option<String> {
404    let mut rv = None;
405    plan.apply(|node| {
406        let Some(data_source) = node.as_any().downcast_ref::<DataSourceExec>() else {
407            return Ok(TreeNodeRecursion::Continue);
408        };
409        let file_scan_config = data_source
410            .data_source()
411            .as_any()
412            .downcast_ref::<FileScanConfig>()
413            .expect("FileScanConfig not found");
414        let Some(liquid_source) = file_scan_config
415            .file_source()
416            .as_any()
417            .downcast_ref::<LiquidParquetSource>()
418        else {
419            return Ok(TreeNodeRecursion::Continue);
420        };
421        let predicate = liquid_source.predicate();
422
423        rv = predicate.map(|v| v.to_string());
424        Ok(TreeNodeRecursion::Stop)
425    })
426    .unwrap();
427    rv
428}
429
430pub(crate) async fn get_execution_stats(
431    State(state): State<Arc<AppState>>,
432) -> Json<Vec<ExecutionStatsWithPlan>> {
433    let execution_stats = state.liquid_cache.inner().get_execution_stats();
434    let mut rv = Vec::new();
435    for execution_stat in execution_stats {
436        let mut plans = Vec::new();
437        for plan_id in execution_stat.plan_ids.iter() {
438            let uuid = Uuid::parse_str(plan_id).expect("Invalid plan ID");
439            let plan = state
440                .liquid_cache
441                .inner()
442                .get_plan(&uuid)
443                .expect("Plan not found");
444            let model_plan = ExecutionPlanWithStats::from(&plan.plan);
445            let plan_info = PlanInfo {
446                id: plan_id.to_string(),
447                created_at: plan
448                    .created_at
449                    .duration_since(std::time::UNIX_EPOCH)
450                    .unwrap()
451                    .as_secs(),
452                plan: model_plan,
453                predicate: get_liquid_exec_info(&plan.plan),
454            };
455            plans.push(plan_info);
456        }
457        let execution_stats_with_plan = ExecutionStatsWithPlan {
458            execution_stats: execution_stat,
459            plans,
460        };
461        rv.push(execution_stats_with_plan);
462    }
463    Json(rv)
464}
465
466pub(crate) async fn stop_flamegraph_handler(
467    State(state): State<Arc<AppState>>,
468) -> Json<ApiResponse> {
469    let svg_content = if let Ok(svg_content) = state.flamegraph.stop_to_string() {
470        svg_content
471    } else {
472        return Json(ApiResponse {
473            message: "Flamegraph not generated".to_string(),
474            status: "error".to_string(),
475        });
476    };
477    Json(ApiResponse {
478        message: svg_content,
479        status: "success".to_string(),
480    })
481}
482
483pub(crate) async fn add_execution_stats_handler(
484    State(state): State<Arc<AppState>>,
485    Json(params): Json<ExecutionStats>,
486) -> Json<ApiResponse> {
487    let message = format!(
488        "Execution stats added for execution {}",
489        params.display_name
490    );
491    state.liquid_cache.inner().add_execution_stats(params);
492    Json(ApiResponse {
493        message,
494        status: "success".to_string(),
495    })
496}
497
498pub(crate) async fn start_disk_usage_monitor_handler(
499    State(state): State<Arc<AppState>>,
500) -> Json<ApiResponse> {
501    state.disk_monitor.clone().start_recording();
502    let message = "Successfully started disk usage monitoring";
503    Json(ApiResponse {
504        message: message.to_string(),
505        status: "success".to_string(),
506    })
507}
508
509pub(crate) async fn stop_disk_usage_monitor_handler(
510    State(state): State<Arc<AppState>>,
511) -> Json<ApiResponse> {
512    state.disk_monitor.clone().stop_recording();
513    let message = "Stopped disk usage monitoring";
514    Json(ApiResponse {
515        message: message.to_string(),
516        status: "success".to_string(),
517    })
518}
519
520#[cfg(test)]
521mod tests {
522    use std::{io::Write, path::PathBuf};
523
524    use tempfile::tempdir;
525
526    use super::*;
527
528    #[test]
529    fn test_get_parquet_cache_usage_inner() {
530        let temp_dir = tempdir().unwrap();
531        let temp_path = temp_dir.path();
532
533        let file1_path = temp_path.join("file1.parquet");
534        let file2_path = temp_path.join("file2.parquet");
535
536        let subdir_path = temp_path.join("subdir");
537        std::fs::create_dir(&subdir_path).unwrap();
538        let file3_path = subdir_path.join("file3.parquet");
539
540        let data1 = [1u8; 1000];
541        let data2 = [2u8; 2000];
542        let data3 = [3u8; 3000];
543
544        let mut file1 = std::fs::File::create(&file1_path).unwrap();
545        file1.write_all(&data1).unwrap();
546
547        let mut file2 = std::fs::File::create(&file2_path).unwrap();
548        file2.write_all(&data2).unwrap();
549
550        let mut file3 = std::fs::File::create(&file3_path).unwrap();
551        file3.write_all(&data3).unwrap();
552
553        // Expected total size: 6000 bytes (1000 + 2000 + 3000)
554
555        let result = get_parquet_cache_usage_inner(temp_path);
556        assert_eq!(result.directory, temp_path.to_string_lossy().to_string());
557        assert_eq!(result.file_count, 3);
558        assert_eq!(result.total_size_bytes, 6000);
559        assert_eq!(result.status, "success");
560    }
561
562    #[test]
563    fn test_get_parquet_cache_usage_inner_empty_dir() {
564        let temp_dir = tempdir().unwrap();
565        let temp_path = temp_dir.path();
566        let result = get_parquet_cache_usage_inner(temp_path);
567        assert_eq!(result.file_count, 0);
568        assert_eq!(result.total_size_bytes, 0);
569    }
570
571    #[test]
572    fn test_get_parquet_cache_usage_inner_nonexistent_dir() {
573        let nonexistent_path = PathBuf::from("/path/does/not/exist");
574        let result = get_parquet_cache_usage_inner(&nonexistent_path);
575        assert_eq!(result.file_count, 0);
576        assert_eq!(result.total_size_bytes, 0);
577    }
578}