1use std::{
2 fs,
3 path::{Path, PathBuf},
4 sync::Arc,
5};
6
7use axum::{
8 Json,
9 extract::{Query, State},
10};
11use datafusion::{
12 catalog::memory::DataSourceExec,
13 common::{
14 stats::Precision,
15 tree_node::{TreeNode, TreeNodeRecursion},
16 },
17 datasource::physical_plan::FileScanConfig,
18 physical_plan::ExecutionPlan,
19};
20use liquid_cache_common::rpc::ExecutionMetricsResponse;
21use liquid_cache_parquet::LiquidParquetSource;
22use log::info;
23use serde::Serialize;
24use uuid::Uuid;
25
26use crate::{
27 ColumnStatistics, ExecutionPlanWithStats, ExecutionStatsWithPlan, MetricValues, PlanInfo,
28 SchemaField, Statistics,
29};
30
31use super::{
32 AppState,
33 models::{ApiResponse, ExecutionStats},
34};
35
36pub(crate) async fn shutdown_handler() -> Json<ApiResponse> {
37 info!("Shutdown request received, shutting down server...");
38
39 tokio::spawn(async {
40 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
41 std::process::exit(0);
42 });
43
44 Json(ApiResponse {
45 message: "Server shutting down...".to_string(),
46 status: "success".to_string(),
47 })
48}
49
50pub(crate) async fn reset_cache_handler(State(state): State<Arc<AppState>>) -> Json<ApiResponse> {
51 info!("Resetting cache...");
52 let cache = state.liquid_cache.cache();
53 unsafe {
54 cache.reset();
55 }
56
57 Json(ApiResponse {
58 message: "Cache reset successfully".to_string(),
59 status: "success".to_string(),
60 })
61}
62
63#[derive(Serialize)]
64pub(crate) struct ParquetCacheUsage {
65 directory: String,
66 file_count: usize,
67 total_size_bytes: u64,
68 status: String,
69}
70
71fn get_parquet_cache_usage_inner(cache_dir: &Path) -> ParquetCacheUsage {
72 let mut file_count = 0;
73 let mut total_size: u64 = 0;
74
75 fn walk_dir(dir: &Path) -> Result<(usize, u64), std::io::Error> {
76 let mut count = 0;
77 let mut size = 0;
78
79 if dir.exists() {
80 for entry in fs::read_dir(dir)? {
81 let entry = entry?;
82 let path = entry.path();
83
84 if path.is_file() {
85 count += 1;
86 let metadata = fs::metadata(&path)?;
87 size += metadata.len();
88 } else if path.is_dir() {
89 let (sub_count, sub_size) = walk_dir(&path)?;
90 count += sub_count;
91 size += sub_size;
92 }
93 }
94 }
95
96 Ok((count, size))
97 }
98
99 if let Ok((count, size)) = walk_dir(cache_dir) {
100 file_count = count;
101 total_size = size;
102 }
103
104 ParquetCacheUsage {
105 directory: cache_dir.to_string_lossy().to_string(),
106 file_count,
107 total_size_bytes: total_size,
108 status: "success".to_string(),
109 }
110}
111
112pub(crate) async fn get_parquet_cache_usage_handler(
113 State(state): State<Arc<AppState>>,
114) -> Json<ParquetCacheUsage> {
115 info!("Getting parquet cache usage...");
116 let cache_dir = state.liquid_cache.get_parquet_cache_dir();
117 let usage = get_parquet_cache_usage_inner(cache_dir);
118 Json(usage)
119}
120
121#[derive(Serialize)]
122pub(crate) struct CacheInfo {
123 batch_size: usize,
124 max_cache_bytes: u64,
125 memory_usage_bytes: u64,
126 disk_usage_bytes: u64,
127}
128
129pub(crate) async fn get_cache_info_handler(State(state): State<Arc<AppState>>) -> Json<CacheInfo> {
130 info!("Getting cache info...");
131 let cache = state.liquid_cache.cache();
132 let batch_size = cache.batch_size();
133 let max_cache_bytes = cache.max_cache_bytes() as u64;
134 let memory_usage_bytes = cache.memory_usage_bytes() as u64;
135 let disk_usage_bytes = cache.disk_usage_bytes() as u64;
136 Json(CacheInfo {
137 batch_size,
138 max_cache_bytes,
139 memory_usage_bytes,
140 disk_usage_bytes,
141 })
142}
143
144#[derive(Serialize)]
145pub(crate) struct SystemInfo {
146 total_memory_bytes: u64,
147 used_memory_bytes: u64,
148 available_memory_bytes: u64,
149 name: String,
150 kernel: String,
151 os: String,
152 host_name: String,
153 cpu_cores: usize,
154 server_resident_memory_bytes: u64,
155 server_virtual_memory_bytes: u64,
156}
157
158pub(crate) async fn get_system_info_handler(
159 State(_state): State<Arc<AppState>>,
160) -> Json<SystemInfo> {
161 info!("Getting system info...");
162 let mut sys = sysinfo::System::new_all();
163 sys.refresh_all();
164 let current_pid = sysinfo::get_current_pid().unwrap();
165 let process = sys.process(current_pid).unwrap();
166 let resident_memory = process.memory();
167 let virtual_memory = process.virtual_memory();
168 Json(SystemInfo {
169 total_memory_bytes: sys.total_memory(),
170 used_memory_bytes: sys.used_memory(),
171 available_memory_bytes: sys.available_memory(),
172 name: sysinfo::System::name().unwrap_or_default(),
173 kernel: sysinfo::System::kernel_version().unwrap_or_default(),
174 os: sysinfo::System::os_version().unwrap_or_default(),
175 host_name: sysinfo::System::host_name().unwrap_or_default(),
176 cpu_cores: sysinfo::System::physical_core_count().unwrap_or(0),
177 server_resident_memory_bytes: resident_memory,
178 server_virtual_memory_bytes: virtual_memory,
179 })
180}
181
182#[derive(serde::Deserialize)]
183pub(crate) struct TraceParams {
184 path: String,
185}
186
187#[derive(serde::Deserialize)]
188pub(crate) struct ExecutionMetricsParams {
189 plan_id: String,
190}
191
192#[derive(serde::Deserialize)]
193pub(crate) struct CacheStatsParams {
194 path: String,
195}
196
197pub(crate) async fn start_trace_handler(State(state): State<Arc<AppState>>) -> Json<ApiResponse> {
198 info!("Starting cache trace collection...");
199 let cache = state.liquid_cache.cache();
200 cache.enable_trace();
201
202 Json(ApiResponse {
203 message: "Cache trace collection started".to_string(),
204 status: "success".to_string(),
205 })
206}
207
208pub(crate) async fn stop_trace_handler(
209 Query(params): Query<TraceParams>,
210 State(state): State<Arc<AppState>>,
211) -> Json<ApiResponse> {
212 info!("Stopping cache trace collection...");
213 let save_path = Path::new(¶ms.path);
214
215 match save_trace_to_file(save_path, &state) {
216 Ok(_) => Json(ApiResponse {
217 message: format!(
218 "Cache trace collection stopped, saved to {}",
219 save_path.display()
220 ),
221 status: "success".to_string(),
222 }),
223 Err(e) => Json(ApiResponse {
224 message: format!("Failed to save trace: {e}"),
225 status: "error".to_string(),
226 }),
227 }
228}
229
230pub(crate) fn save_trace_to_file(
231 save_dir: &Path,
232 state: &AppState,
233) -> Result<(), Box<dyn std::error::Error>> {
234 let now = std::time::SystemTime::now();
235 let datetime = now.duration_since(std::time::UNIX_EPOCH).unwrap();
236 let minute = (datetime.as_secs() / 60) % 60;
237 let second = datetime.as_secs() % 60;
238 let trace_id = state
239 .trace_id
240 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
241 let filename = format!("cache-trace-id{trace_id:02}-{minute:02}-{second:03}.parquet",);
242
243 if !save_dir.exists() {
245 fs::create_dir_all(save_dir)?;
246 }
247
248 let file_path = save_dir.join(filename);
249 let cache = state.liquid_cache.cache();
250 cache.disable_trace();
251 cache.flush_trace(&file_path);
252 Ok(())
253}
254
255pub(crate) async fn get_execution_metrics_handler(
256 State(state): State<Arc<AppState>>,
257 Query(params): Query<ExecutionMetricsParams>,
258) -> Json<Option<ExecutionMetricsResponse>> {
259 let Ok(uuid) = Uuid::parse_str(¶ms.plan_id) else {
260 return Json(None);
261 };
262 let metrics = state.liquid_cache.inner().get_metrics(&uuid);
263 Json(metrics)
264}
265
266pub(crate) fn get_cache_stats_inner(
267 cache: &liquid_cache_parquet::LiquidCacheRef,
268 save_dir: impl AsRef<Path>,
269 state: &AppState,
270) -> Result<PathBuf, Box<dyn std::error::Error>> {
271 let now = std::time::SystemTime::now();
272 let datetime = now.duration_since(std::time::UNIX_EPOCH).unwrap();
273 let minute = (datetime.as_secs() / 60) % 60;
274 let second = datetime.as_secs() % 60;
275 let trace_id = state
276 .stats_id
277 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
278 let filename = format!("cache-stats-id{trace_id:02}-{minute:02}-{second:03}.parquet",);
279 let file_path = save_dir.as_ref().join(filename);
280 cache.write_stats(&file_path)?;
281 Ok(file_path)
282}
283
284pub(crate) async fn get_cache_stats_handler(
285 State(state): State<Arc<AppState>>,
286 Query(params): Query<CacheStatsParams>,
287) -> Json<ApiResponse> {
288 let cache = state.liquid_cache.cache();
289 match get_cache_stats_inner(cache, ¶ms.path, &state) {
290 Ok(file_path) => {
291 info!("Cache stats saved to {}", file_path.display());
292 Json(ApiResponse {
293 message: format!("Cache stats saved to {}", file_path.display()),
294 status: "success".to_string(),
295 })
296 }
297 Err(e) => Json(ApiResponse {
298 message: format!("Failed to get cache stats: {e}"),
299 status: "error".to_string(),
300 }),
301 }
302}
303
304pub(crate) async fn start_flamegraph_handler(
305 State(state): State<Arc<AppState>>,
306) -> Json<ApiResponse> {
307 info!("Starting flamegraph collection...");
308 state.flamegraph.start();
309 Json(ApiResponse {
310 message: "Flamegraph collection started".to_string(),
311 status: "success".to_string(),
312 })
313}
314
315impl From<&Arc<dyn ExecutionPlan>> for ExecutionPlanWithStats {
316 fn from(plan: &Arc<dyn ExecutionPlan>) -> Self {
317 let metrics = plan.metrics().unwrap().aggregate_by_name();
318 let mut metric_values = Vec::new();
319 for metric in metrics.iter() {
320 metric_values.push(MetricValues {
321 name: metric.value().name().to_string(),
322 value: metric.value().to_string(),
323 });
324 }
325
326 let mut column_statistics = Vec::new();
327 for (i, cs) in plan
328 .partition_statistics(None)
329 .unwrap()
330 .column_statistics
331 .iter()
332 .enumerate()
333 {
334 let min = if cs.min_value != Precision::Absent {
335 Some(cs.min_value.to_string())
336 } else {
337 None
338 };
339 let max = if cs.max_value != Precision::Absent {
340 Some(cs.max_value.to_string())
341 } else {
342 None
343 };
344 let sum = if cs.sum_value != Precision::Absent {
345 Some(cs.sum_value.to_string())
346 } else {
347 None
348 };
349 let distinct = if cs.distinct_count != Precision::Absent {
350 Some(cs.distinct_count.to_string())
351 } else {
352 None
353 };
354 let null = if cs.null_count != Precision::Absent {
355 Some(cs.null_count.to_string())
356 } else {
357 None
358 };
359 column_statistics.push(ColumnStatistics {
360 name: format!("col_{i}"),
361 null,
362 min,
363 max,
364 sum,
365 distinct_count: distinct,
366 });
367 }
368
369 ExecutionPlanWithStats {
370 name: plan.name().to_string(),
371 schema: plan
372 .schema()
373 .fields()
374 .iter()
375 .map(|field| SchemaField {
376 name: field.name().to_string(),
377 data_type: field.data_type().to_string(),
378 })
379 .collect(),
380 statistics: Statistics {
381 num_rows: plan
382 .partition_statistics(None)
383 .unwrap()
384 .num_rows
385 .to_string(),
386 total_byte_size: plan
387 .partition_statistics(None)
388 .unwrap()
389 .total_byte_size
390 .to_string(),
391 column_statistics,
392 },
393 metrics: metric_values,
394 children: plan
395 .children()
396 .iter()
397 .map(|child| (*child).into())
398 .collect(),
399 }
400 }
401}
402
403fn get_liquid_exec_info(plan: &Arc<dyn ExecutionPlan>) -> Option<String> {
404 let mut rv = None;
405 plan.apply(|node| {
406 let Some(data_source) = node.as_any().downcast_ref::<DataSourceExec>() else {
407 return Ok(TreeNodeRecursion::Continue);
408 };
409 let file_scan_config = data_source
410 .data_source()
411 .as_any()
412 .downcast_ref::<FileScanConfig>()
413 .expect("FileScanConfig not found");
414 let Some(liquid_source) = file_scan_config
415 .file_source()
416 .as_any()
417 .downcast_ref::<LiquidParquetSource>()
418 else {
419 return Ok(TreeNodeRecursion::Continue);
420 };
421 let predicate = liquid_source.predicate();
422
423 rv = predicate.map(|v| v.to_string());
424 Ok(TreeNodeRecursion::Stop)
425 })
426 .unwrap();
427 rv
428}
429
430pub(crate) async fn get_execution_stats(
431 State(state): State<Arc<AppState>>,
432) -> Json<Vec<ExecutionStatsWithPlan>> {
433 let execution_stats = state.liquid_cache.inner().get_execution_stats();
434 let mut rv = Vec::new();
435 for execution_stat in execution_stats {
436 let mut plans = Vec::new();
437 for plan_id in execution_stat.plan_ids.iter() {
438 let uuid = Uuid::parse_str(plan_id).expect("Invalid plan ID");
439 let plan = state
440 .liquid_cache
441 .inner()
442 .get_plan(&uuid)
443 .expect("Plan not found");
444 let model_plan = ExecutionPlanWithStats::from(&plan.plan);
445 let plan_info = PlanInfo {
446 id: plan_id.to_string(),
447 created_at: plan
448 .created_at
449 .duration_since(std::time::UNIX_EPOCH)
450 .unwrap()
451 .as_secs(),
452 plan: model_plan,
453 predicate: get_liquid_exec_info(&plan.plan),
454 };
455 plans.push(plan_info);
456 }
457 let execution_stats_with_plan = ExecutionStatsWithPlan {
458 execution_stats: execution_stat,
459 plans,
460 };
461 rv.push(execution_stats_with_plan);
462 }
463 Json(rv)
464}
465
466pub(crate) async fn stop_flamegraph_handler(
467 State(state): State<Arc<AppState>>,
468) -> Json<ApiResponse> {
469 let svg_content = if let Ok(svg_content) = state.flamegraph.stop_to_string() {
470 svg_content
471 } else {
472 return Json(ApiResponse {
473 message: "Flamegraph not generated".to_string(),
474 status: "error".to_string(),
475 });
476 };
477 Json(ApiResponse {
478 message: svg_content,
479 status: "success".to_string(),
480 })
481}
482
483pub(crate) async fn add_execution_stats_handler(
484 State(state): State<Arc<AppState>>,
485 Json(params): Json<ExecutionStats>,
486) -> Json<ApiResponse> {
487 let message = format!(
488 "Execution stats added for execution {}",
489 params.display_name
490 );
491 state.liquid_cache.inner().add_execution_stats(params);
492 Json(ApiResponse {
493 message,
494 status: "success".to_string(),
495 })
496}
497
498pub(crate) async fn start_disk_usage_monitor_handler(
499 State(state): State<Arc<AppState>>,
500) -> Json<ApiResponse> {
501 state.disk_monitor.clone().start_recording();
502 let message = "Successfully started disk usage monitoring";
503 Json(ApiResponse {
504 message: message.to_string(),
505 status: "success".to_string(),
506 })
507}
508
509pub(crate) async fn stop_disk_usage_monitor_handler(
510 State(state): State<Arc<AppState>>,
511) -> Json<ApiResponse> {
512 state.disk_monitor.clone().stop_recording();
513 let message = "Stopped disk usage monitoring";
514 Json(ApiResponse {
515 message: message.to_string(),
516 status: "success".to_string(),
517 })
518}
519
520#[cfg(test)]
521mod tests {
522 use std::{io::Write, path::PathBuf};
523
524 use tempfile::tempdir;
525
526 use super::*;
527
528 #[test]
529 fn test_get_parquet_cache_usage_inner() {
530 let temp_dir = tempdir().unwrap();
531 let temp_path = temp_dir.path();
532
533 let file1_path = temp_path.join("file1.parquet");
534 let file2_path = temp_path.join("file2.parquet");
535
536 let subdir_path = temp_path.join("subdir");
537 std::fs::create_dir(&subdir_path).unwrap();
538 let file3_path = subdir_path.join("file3.parquet");
539
540 let data1 = [1u8; 1000];
541 let data2 = [2u8; 2000];
542 let data3 = [3u8; 3000];
543
544 let mut file1 = std::fs::File::create(&file1_path).unwrap();
545 file1.write_all(&data1).unwrap();
546
547 let mut file2 = std::fs::File::create(&file2_path).unwrap();
548 file2.write_all(&data2).unwrap();
549
550 let mut file3 = std::fs::File::create(&file3_path).unwrap();
551 file3.write_all(&data3).unwrap();
552
553 let result = get_parquet_cache_usage_inner(temp_path);
556 assert_eq!(result.directory, temp_path.to_string_lossy().to_string());
557 assert_eq!(result.file_count, 3);
558 assert_eq!(result.total_size_bytes, 6000);
559 assert_eq!(result.status, "success");
560 }
561
562 #[test]
563 fn test_get_parquet_cache_usage_inner_empty_dir() {
564 let temp_dir = tempdir().unwrap();
565 let temp_path = temp_dir.path();
566 let result = get_parquet_cache_usage_inner(temp_path);
567 assert_eq!(result.file_count, 0);
568 assert_eq!(result.total_size_bytes, 0);
569 }
570
571 #[test]
572 fn test_get_parquet_cache_usage_inner_nonexistent_dir() {
573 let nonexistent_path = PathBuf::from("/path/does/not/exist");
574 let result = get_parquet_cache_usage_inner(&nonexistent_path);
575 assert_eq!(result.file_count, 0);
576 assert_eq!(result.total_size_bytes, 0);
577 }
578}