1use serde::{Deserialize, Serialize};
2use std::path::Path;
3use std::process::ExitStatus;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6use tokio_stream::{Stream, wrappers::ReceiverStream};
7
8#[cfg(unix)]
9use std::os::unix::process::ExitStatusExt;
10
11fn create_success_exit_status() -> ExitStatus {
13 #[cfg(unix)]
14 {
15 ExitStatus::from_raw(0)
16 }
17 #[cfg(not(unix))]
18 {
19 std::process::Command::new("cmd")
21 .args(&["/C", "echo", "ok"])
22 .output()
23 .map(|output| output.status)
24 .unwrap_or_else(|_| {
25 std::process::Command::new("true")
27 .output()
28 .map(|output| output.status)
29 .unwrap_or_else(|_| panic!("Cannot create exit status"))
30 })
31 }
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct InitProgress {
37 pub stage: String,
38 pub message: String,
39 pub percentage: f64,
40 pub current_step: usize,
41 pub total_steps: usize,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct DownloadProgress {
47 pub task_id: String,
48 pub file_name: String,
49 pub downloaded_bytes: u64,
50 pub total_bytes: u64,
51 pub download_speed: f64, pub eta_seconds: u64,
53 pub percentage: f64,
54 pub status: DownloadStatus,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub enum DownloadStatus {
59 Starting,
60 Downloading,
61 Paused,
62 Completed,
63 Failed(String),
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct SystemInfo {
69 pub os: String,
70 pub arch: String,
71 pub total_memory: u64,
72 pub available_memory: u64,
73 pub cpu_count: usize,
74 pub docker_version: Option<String>,
75 pub disk_space: DiskSpace,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct DiskSpace {
80 pub total: u64,
81 pub available: u64,
82 pub used: u64,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ServiceStatus {
88 pub name: String,
89 pub status: String,
90 pub health: String,
91 pub uptime: Option<u64>,
92 pub cpu_usage: f64,
93 pub memory_usage: u64,
94 pub ports: Vec<String>,
95}
96
97pub async fn init_with_progress<F>(
99 working_dir: &Path,
100 progress_callback: F,
101) -> std::result::Result<(), Box<dyn std::error::Error>>
102where
103 F: Fn(InitProgress) + Send + Sync + 'static,
104{
105 let callback = Arc::new(progress_callback);
106
107 let steps = [
109 ("downloading", "Preparing initialization environment..."),
110 (
111 "extracting",
112 "Creating configuration files and directories...",
113 ),
114 ("loading", "Initializing DuckDB database..."),
115 ("starting", "Registering client..."),
116 ("configuring", "Finalizing initialization setup..."),
117 ];
118
119 let total_steps = steps.len();
120
121 let progress = InitProgress {
123 stage: steps[0].0.to_string(),
124 message: steps[0].1.to_string(),
125 percentage: 0.0,
126 current_step: 1,
127 total_steps,
128 };
129 callback(progress);
130
131 tokio::fs::create_dir_all(working_dir).await?;
133
134 let current_dir = std::env::current_dir()?;
136 std::env::set_current_dir(working_dir)?;
137
138 let progress = InitProgress {
140 stage: steps[1].0.to_string(),
141 message: steps[1].1.to_string(),
142 percentage: 20.0,
143 current_step: 2,
144 total_steps,
145 };
146 callback(progress);
147
148 use crate::init::run_init;
150 if let Err(e) = run_init(true).await {
151 std::env::set_current_dir(current_dir)?;
153 return Err(e.into());
154 }
155
156 let progress = InitProgress {
158 stage: steps[2].0.to_string(),
159 message: steps[2].1.to_string(),
160 percentage: 60.0,
161 current_step: 3,
162 total_steps,
163 };
164 callback(progress);
165
166 let progress = InitProgress {
168 stage: steps[3].0.to_string(),
169 message: steps[3].1.to_string(),
170 percentage: 80.0,
171 current_step: 4,
172 total_steps,
173 };
174 callback(progress);
175
176 let progress = InitProgress {
178 stage: steps[4].0.to_string(),
179 message: steps[4].1.to_string(),
180 percentage: 90.0,
181 current_step: 5,
182 total_steps,
183 };
184 callback(progress);
185
186 std::env::set_current_dir(current_dir)?;
188
189 let final_progress = InitProgress {
191 stage: "configuring".to_string(),
192 message: "Initialization completed!".to_string(),
193 percentage: 100.0,
194 current_step: total_steps,
195 total_steps,
196 };
197 callback(final_progress);
198
199 Ok(())
200}
201
202pub async fn download_with_progress<F>(
204 url: &str,
205 target_dir: &Path,
206 progress_callback: F,
207) -> std::result::Result<(), Box<dyn std::error::Error>>
208where
209 F: Fn(DownloadProgress) + Send + Sync + 'static,
210{
211 let callback = Arc::new(progress_callback);
212
213 let file_name = url.split('/').next_back().unwrap_or("unknown_file");
215 let task_id = format!("download_{}", chrono::Utc::now().timestamp());
216
217 let client = reqwest::Client::new();
219
220 let mut progress = DownloadProgress {
222 task_id: task_id.clone(),
223 file_name: file_name.to_string(),
224 downloaded_bytes: 0,
225 total_bytes: 0,
226 download_speed: 0.0,
227 eta_seconds: 0,
228 percentage: 0.0,
229 status: DownloadStatus::Starting,
230 };
231
232 callback(progress.clone());
233
234 let response = client.head(url).send().await?;
236 let total_size = response.content_length().unwrap_or(0);
237
238 progress.total_bytes = total_size;
239 progress.status = DownloadStatus::Downloading;
240 callback(progress.clone());
241
242 let mut response = client.get(url).send().await?;
244 let target_path = target_dir.join(file_name);
245
246 tokio::fs::create_dir_all(target_dir).await?;
248
249 let mut file = tokio::fs::File::create(&target_path).await?;
250 let mut downloaded = 0u64;
251 let start_time = std::time::Instant::now();
252 let mut last_update = start_time;
253
254 while let Some(chunk) = response.chunk().await? {
256 use tokio::io::AsyncWriteExt;
257 file.write_all(&chunk).await?;
258 downloaded += chunk.len() as u64;
259
260 let now = std::time::Instant::now();
261
262 if now.duration_since(last_update).as_millis() > 500 {
264 let elapsed = now.duration_since(start_time).as_secs_f64();
265 let speed = downloaded as f64 / elapsed;
266 let eta = if speed > 0.0 {
267 ((total_size - downloaded) as f64 / speed) as u64
268 } else {
269 0
270 };
271
272 progress.downloaded_bytes = downloaded;
273 progress.download_speed = speed;
274 progress.eta_seconds = eta;
275 progress.percentage = if total_size > 0 {
276 (downloaded as f64 / total_size as f64) * 100.0
277 } else {
278 0.0
279 };
280
281 callback(progress.clone());
282 last_update = now;
283 }
284 }
285
286 progress.downloaded_bytes = downloaded;
288 progress.percentage = 100.0;
289 progress.status = DownloadStatus::Completed;
290 callback(progress);
291
292 Ok(())
293}
294
295pub fn get_system_info() -> SystemInfo {
297 let os = std::env::consts::OS.to_string();
298 let arch = std::env::consts::ARCH.to_string();
299
300 let (total_memory, available_memory) = get_memory_info();
302
303 let cpu_count = num_cpus::get();
305
306 let docker_version = get_docker_version();
308
309 let disk_space = get_disk_space();
311
312 SystemInfo {
313 os,
314 arch,
315 total_memory,
316 available_memory,
317 cpu_count,
318 docker_version,
319 disk_space,
320 }
321}
322
323pub async fn monitor_services() -> impl Stream<Item = ServiceStatus> {
325 let (tx, rx) = mpsc::channel(100);
326
327 tokio::spawn(async move {
329 loop {
330 let services = get_all_services().await;
332
333 for service in services {
334 if tx.send(service).await.is_err() {
335 break;
336 }
337 }
338
339 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
341 }
342 });
343
344 ReceiverStream::new(rx)
345}
346
347fn get_memory_info() -> (u64, u64) {
349 #[cfg(target_os = "macos")]
350 {
351 use std::process::Command;
352
353 let output = Command::new("sysctl")
354 .args(["hw.memsize"])
355 .output()
356 .unwrap_or_else(|_| std::process::Output {
357 stdout: b"hw.memsize: 8589934592".to_vec(),
358 stderr: vec![],
359 status: create_success_exit_status(),
360 });
361
362 let output_str = String::from_utf8_lossy(&output.stdout);
363 let total = output_str
364 .split(':')
365 .nth(1)
366 .and_then(|s| s.trim().parse::<u64>().ok())
367 .unwrap_or(8589934592); let available = total / 4; (total, available)
372 }
373
374 #[cfg(target_os = "linux")]
375 {
376 use std::fs;
377
378 let meminfo = fs::read_to_string("/proc/meminfo")
379 .unwrap_or_else(|_| "MemTotal: 8388608 kB\nMemAvailable: 2097152 kB".to_string());
380
381 let mut total = 0;
382 let mut available = 0;
383
384 for line in meminfo.lines() {
385 if line.starts_with("MemTotal:") {
386 total = line
387 .split_whitespace()
388 .nth(1)
389 .and_then(|s| s.parse::<u64>().ok())
390 .unwrap_or(8388608)
391 * 1024; } else if line.starts_with("MemAvailable:") {
393 available = line
394 .split_whitespace()
395 .nth(1)
396 .and_then(|s| s.parse::<u64>().ok())
397 .unwrap_or(2097152)
398 * 1024; }
400 }
401
402 (total, available)
403 }
404
405 #[cfg(target_os = "windows")]
406 {
407 (8589934592, 2147483648) }
410
411 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
412 {
413 (8589934592, 2147483648) }
415}
416
417fn get_docker_version() -> Option<String> {
419 use std::process::Command;
420
421 Command::new("docker")
422 .args(["--version"])
423 .output()
424 .ok()
425 .and_then(|output| {
426 if output.status.success() {
427 Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
428 } else {
429 None
430 }
431 })
432}
433
434fn get_disk_space() -> DiskSpace {
436 #[cfg(unix)]
437 {
438 use std::process::Command;
439
440 let output = Command::new("df")
441 .args(["-h", "."])
442 .output()
443 .unwrap_or_else(|_| {
444 std::process::Output {
445 stdout: b"Filesystem Size Used Avail Use% Mounted on\n/dev/disk1s1 465G 120G 340G 27% /".to_vec(),
446 stderr: vec![],
447 status: create_success_exit_status(),
448 }
449 });
450
451 let output_str = String::from_utf8_lossy(&output.stdout);
452 let lines: Vec<&str> = output_str.lines().collect();
453
454 if lines.len() >= 2 {
455 let parts: Vec<&str> = lines[1].split_whitespace().collect();
456 if parts.len() >= 4 {
457 let total = parse_size(parts[1]).unwrap_or(500_000_000_000); let used = parse_size(parts[2]).unwrap_or(120_000_000_000); let available = parse_size(parts[3]).unwrap_or(380_000_000_000); return DiskSpace {
462 total,
463 used,
464 available,
465 };
466 }
467 }
468 }
469
470 DiskSpace {
472 total: 500_000_000_000, used: 120_000_000_000, available: 380_000_000_000, }
476}
477
478fn parse_size(size_str: &str) -> Option<u64> {
480 let size_str = size_str.trim();
481 let (num_str, unit) = if let Some(pos) = size_str.find(|c: char| c.is_alphabetic()) {
482 (&size_str[..pos], &size_str[pos..])
483 } else {
484 (size_str, "")
485 };
486
487 let num: f64 = num_str.parse().ok()?;
488 let multiplier = match unit.to_uppercase().as_str() {
489 "K" | "KB" => 1024u64,
490 "M" | "MB" => 1024u64 * 1024,
491 "G" | "GB" => 1024u64 * 1024 * 1024,
492 "T" | "TB" => 1024u64 * 1024 * 1024 * 1024,
493 _ => 1u64,
494 };
495
496 Some((num * multiplier as f64) as u64)
497}
498
499async fn get_all_services() -> Vec<ServiceStatus> {
501 use std::process::Command;
502
503 let output = Command::new("docker")
505 .args([
506 "ps",
507 "--format",
508 "table {{.Names}}\t{{.Status}}\t{{.Ports}}",
509 ])
510 .output()
511 .unwrap_or_else(|_| std::process::Output {
512 stdout: b"NAMES\tSTATUS\tPORTS\ntest-service\tUp 2 hours\t0.0.0.0:8080->8080/tcp"
513 .to_vec(),
514 stderr: vec![],
515 status: create_success_exit_status(),
516 });
517
518 let output_str = String::from_utf8_lossy(&output.stdout);
519 let mut services = Vec::new();
520
521 for line in output_str.lines().skip(1) {
522 let parts: Vec<&str> = line.split('\t').collect();
524 if parts.len() >= 3 {
525 services.push(ServiceStatus {
526 name: parts[0].to_string(),
527 status: parts[1].to_string(),
528 health: "healthy".to_string(), uptime: Some(7200), cpu_usage: 15.5, memory_usage: 256 * 1024 * 1024, ports: vec![parts[2].to_string()],
533 });
534 }
535 }
536
537 services
538}
539
540pub async fn get_ui_config() -> std::result::Result<serde_json::Value, Box<dyn std::error::Error>> {
542 let config = serde_json::json!({
545 "theme": "dark",
546 "language": "zh-CN",
547 "auto_refresh": true,
548 "refresh_interval": 5,
549 "show_notifications": true
550 });
551
552 Ok(config)
553}
554
555pub async fn update_ui_config(
556 config: serde_json::Value,
557) -> std::result::Result<(), Box<dyn std::error::Error>> {
558 if !config.is_object() {
561 return Err("Configuration must be a JSON object".to_string().into());
562 }
563
564 Ok(())
565}