1use crate::config::NodeConfigManager;
2use anyhow::{anyhow, Result};
3use serde::{Deserialize, Serialize};
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::process::Stdio;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8use tokio::process::{Child, Command};
9use tokio::signal;
10use tokio::sync::{Mutex, RwLock};
11use tokio::time::interval;
12use tracing::{error, info, warn};
13
14const PID_FILE: &str = "qudag.pid";
16const LOG_FILE: &str = "qudag.log";
17const CONFIG_FILE: &str = "config.toml";
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct NodeState {
22 pub pid: u32,
23 pub started_at: u64,
24 pub port: u16,
25 pub data_dir: PathBuf,
26 pub log_file: PathBuf,
27 pub config_file: PathBuf,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct NodeManagerConfig {
33 pub base_dir: PathBuf,
35 pub default_port: u16,
37 pub log_rotation_size_mb: u64,
39 pub max_log_files: usize,
41 pub health_check_interval: u64,
43 pub shutdown_timeout: u64,
45}
46
47impl Default for NodeManagerConfig {
48 fn default() -> Self {
49 let home_dir = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
50 let base_dir = home_dir.join(".qudag");
51
52 Self {
53 base_dir,
54 default_port: 8000,
55 log_rotation_size_mb: 100,
56 max_log_files: 5,
57 health_check_interval: 60,
58 shutdown_timeout: 30,
59 }
60 }
61}
62
63pub struct NodeManager {
65 config: NodeManagerConfig,
66 state: RwLock<Option<NodeState>>,
67 process: Mutex<Option<Child>>,
68 config_manager: NodeConfigManager,
69}
70
71impl NodeManager {
72 pub fn new(config: NodeManagerConfig) -> Result<Self> {
74 fs::create_dir_all(&config.base_dir)?;
76
77 let config_manager = NodeConfigManager::new(config.base_dir.join(CONFIG_FILE))?;
79
80 let state = Self::load_state(&config.base_dir)?;
82
83 Ok(Self {
84 config,
85 state: RwLock::new(state),
86 process: Mutex::new(None),
87 config_manager,
88 })
89 }
90
91 pub async fn is_running(&self) -> bool {
93 if let Some(state) = self.state.read().await.as_ref() {
94 Self::check_process_alive(state.pid).await
96 } else {
97 false
98 }
99 }
100
101 pub async fn start_node(
103 &self,
104 port: Option<u16>,
105 data_dir: Option<PathBuf>,
106 peers: Vec<String>,
107 foreground: bool,
108 ) -> Result<()> {
109 if self.is_running().await {
111 return Err(anyhow!("Node is already running"));
112 }
113
114 let port = port.unwrap_or(self.config.default_port);
115 let data_dir = data_dir.unwrap_or_else(|| self.config.base_dir.join("data"));
116
117 fs::create_dir_all(&data_dir)?;
119
120 self.config_manager
122 .update_config(|config| {
123 config.network_port = port;
124 config.data_dir = data_dir.clone();
125 if !peers.is_empty() {
126 config.initial_peers = peers.clone();
127 }
128 Ok(())
129 })
130 .await?;
131
132 let log_file = self.config.base_dir.join(LOG_FILE);
134 let log_file_handle = std::fs::OpenOptions::new()
135 .create(true)
136 .append(true)
137 .open(&log_file)?;
138
139 info!(
140 "Starting QuDAG node on port {} with data dir {:?}",
141 port, data_dir
142 );
143
144 let mut cmd = Command::new(std::env::current_exe()?);
146 cmd.arg("run-node")
147 .arg("--port")
148 .arg(port.to_string())
149 .arg("--data-dir")
150 .arg(data_dir.display().to_string());
151
152 for peer in &peers {
153 cmd.arg("--peer").arg(peer);
154 }
155
156 if !foreground {
158 cmd.stdin(Stdio::null())
159 .stdout(Stdio::from(log_file_handle.try_clone()?))
160 .stderr(Stdio::from(log_file_handle));
161 }
162
163 let mut child = cmd.spawn()?;
165 let pid = child
166 .id()
167 .ok_or_else(|| anyhow!("Failed to get process ID"))?;
168
169 let state = NodeState {
171 pid,
172 started_at: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
173 port,
174 data_dir,
175 log_file: log_file.clone(),
176 config_file: self.config.base_dir.join(CONFIG_FILE),
177 };
178
179 Self::save_state(&self.config.base_dir, &state)?;
181 *self.state.write().await = Some(state.clone());
182
183 if foreground {
184 info!("Running node in foreground mode");
186
187 let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?;
189 let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())?;
190
191 tokio::select! {
192 _ = sigterm.recv() => {
193 info!("Received SIGTERM, shutting down...");
194 }
195 _ = sigint.recv() => {
196 info!("Received SIGINT, shutting down...");
197 }
198 status = child.wait() => {
199 match status {
200 Ok(status) => {
201 if status.success() {
202 info!("Node process exited successfully");
203 } else {
204 error!("Node process exited with status: {}", status);
205 }
206 }
207 Err(e) => {
208 error!("Failed to wait for node process: {}", e);
209 }
210 }
211 }
212 }
213
214 self.cleanup_state().await?;
216 } else {
217 *self.process.lock().await = Some(child);
219
220 self.start_health_check().await;
222
223 info!("Node started successfully in background (PID: {})", pid);
224 info!("Log file: {:?}", log_file);
225 }
226
227 Ok(())
228 }
229
230 pub async fn stop_node(&self, force: bool) -> Result<()> {
232 let state = match self.state.read().await.as_ref() {
233 Some(s) => s.clone(),
234 None => return Err(anyhow!("No node is currently running")),
235 };
236
237 info!("Stopping node (PID: {})", state.pid);
238
239 if !force {
241 if let Err(e) = self.graceful_shutdown(&state).await {
242 warn!("Graceful shutdown failed: {}, attempting force kill", e);
243 self.force_kill(&state).await?;
244 }
245 } else {
246 self.force_kill(&state).await?;
247 }
248
249 self.cleanup_state().await?;
251
252 info!("Node stopped successfully");
253 Ok(())
254 }
255
256 pub async fn restart_node(&self, force: bool) -> Result<()> {
258 info!("Restarting node...");
259
260 let (port, data_dir, peers) = if let Some(state) = self.state.read().await.as_ref() {
262 let config = self.config_manager.load_config().await?;
263 (
264 Some(state.port),
265 Some(state.data_dir.clone()),
266 config.initial_peers,
267 )
268 } else {
269 return Err(anyhow!("No node is currently running"));
270 };
271
272 self.stop_node(force).await?;
274
275 tokio::time::sleep(Duration::from_secs(2)).await;
277
278 self.start_node(port, data_dir, peers, false).await?;
280
281 Ok(())
282 }
283
284 pub async fn get_status(&self) -> Result<NodeStatus> {
286 if let Some(state) = self.state.read().await.as_ref() {
287 let is_running = Self::check_process_alive(state.pid).await;
288 let uptime = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() - state.started_at;
289
290 Ok(NodeStatus {
291 is_running,
292 pid: Some(state.pid),
293 port: state.port,
294 data_dir: state.data_dir.clone(),
295 log_file: state.log_file.clone(),
296 uptime_seconds: if is_running { Some(uptime) } else { None },
297 last_health_check: None, })
299 } else {
300 Ok(NodeStatus {
301 is_running: false,
302 pid: None,
303 port: self.config.default_port,
304 data_dir: self.config.base_dir.join("data"),
305 log_file: self.config.base_dir.join(LOG_FILE),
306 uptime_seconds: None,
307 last_health_check: None,
308 })
309 }
310 }
311
312 pub async fn generate_systemd_service(&self, output_path: Option<PathBuf>) -> Result<String> {
314 let exe_path = std::env::current_exe()?;
315 let config = self.config_manager.load_config().await?;
316
317 let service_content = format!(
318 r#"[Unit]
319Description=QuDAG Protocol Node
320After=network.target
321
322[Service]
323Type=simple
324ExecStart={} start --port {} --data-dir {}
325ExecStop={} stop
326Restart=on-failure
327RestartSec=10
328User={}
329WorkingDirectory={}
330
331# Security settings
332NoNewPrivileges=true
333PrivateTmp=true
334ProtectSystem=strict
335ProtectHome=true
336ReadWritePaths={}
337
338# Resource limits
339LimitNOFILE=65536
340MemoryLimit=2G
341
342[Install]
343WantedBy=multi-user.target
344"#,
345 exe_path.display(),
346 config.network_port,
347 config.data_dir.display(),
348 exe_path.display(),
349 whoami::username(),
350 self.config.base_dir.display(),
351 config.data_dir.display(),
352 );
353
354 if let Some(path) = output_path {
355 fs::write(&path, &service_content)?;
356 info!("Systemd service file written to: {:?}", path);
357 }
358
359 Ok(service_content)
360 }
361
362 pub async fn tail_logs(&self, lines: usize, follow: bool) -> Result<()> {
364 let log_file = self.config.base_dir.join(LOG_FILE);
365
366 if !log_file.exists() {
367 return Err(anyhow!("Log file not found: {:?}", log_file));
368 }
369
370 if follow {
371 let mut cmd = Command::new("tail");
373 cmd.arg("-f")
374 .arg("-n")
375 .arg(lines.to_string())
376 .arg(&log_file);
377
378 let mut child = cmd.spawn()?;
379
380 let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())?;
382
383 tokio::select! {
384 _ = sigint.recv() => {
385 child.kill().await?;
386 }
387 _ = child.wait() => {}
388 }
389 } else {
390 let content = fs::read_to_string(&log_file)?;
392 let lines_vec: Vec<&str> = content.lines().collect();
393 let start = lines_vec.len().saturating_sub(lines);
394
395 for line in &lines_vec[start..] {
396 println!("{}", line);
397 }
398 }
399
400 Ok(())
401 }
402
403 fn load_state(base_dir: &Path) -> Result<Option<NodeState>> {
405 let pid_file = base_dir.join(PID_FILE);
406
407 if !pid_file.exists() {
408 return Ok(None);
409 }
410
411 let content = fs::read_to_string(&pid_file)?;
412 let state: NodeState = serde_json::from_str(&content)?;
413
414 if Self::check_process_alive_sync(state.pid) {
416 Ok(Some(state))
417 } else {
418 let _ = fs::remove_file(&pid_file);
420 Ok(None)
421 }
422 }
423
424 fn save_state(base_dir: &Path, state: &NodeState) -> Result<()> {
426 let pid_file = base_dir.join(PID_FILE);
427 let content = serde_json::to_string_pretty(state)?;
428 fs::write(&pid_file, content)?;
429 Ok(())
430 }
431
432 async fn check_process_alive(pid: u32) -> bool {
434 match Command::new("kill")
436 .arg("-0")
437 .arg(pid.to_string())
438 .output()
439 .await
440 {
441 Ok(output) => output.status.success(),
442 Err(_) => false,
443 }
444 }
445
446 fn check_process_alive_sync(pid: u32) -> bool {
448 #[allow(unsafe_code)]
450 unsafe {
451 libc::kill(pid as i32, 0) == 0
452 }
453 }
454
455 async fn graceful_shutdown(&self, state: &NodeState) -> Result<()> {
457 info!("Attempting graceful shutdown of PID {}", state.pid);
458
459 Command::new("kill")
461 .arg("-TERM")
462 .arg(state.pid.to_string())
463 .output()
464 .await?;
465
466 let deadline =
468 tokio::time::Instant::now() + Duration::from_secs(self.config.shutdown_timeout);
469 let mut check_interval = interval(Duration::from_millis(100));
470
471 while tokio::time::Instant::now() < deadline {
472 check_interval.tick().await;
473
474 if !Self::check_process_alive(state.pid).await {
475 info!("Process {} exited gracefully", state.pid);
476 return Ok(());
477 }
478 }
479
480 Err(anyhow!("Process did not exit within timeout"))
481 }
482
483 async fn force_kill(&self, state: &NodeState) -> Result<()> {
485 warn!("Force killing process {}", state.pid);
486
487 Command::new("kill")
488 .arg("-KILL")
489 .arg(state.pid.to_string())
490 .output()
491 .await?;
492
493 tokio::time::sleep(Duration::from_millis(500)).await;
495
496 if Self::check_process_alive(state.pid).await {
497 return Err(anyhow!("Failed to kill process {}", state.pid));
498 }
499
500 Ok(())
501 }
502
503 async fn cleanup_state(&self) -> Result<()> {
505 *self.state.write().await = None;
506 *self.process.lock().await = None;
507
508 let pid_file = self.config.base_dir.join(PID_FILE);
509 if pid_file.exists() {
510 fs::remove_file(&pid_file)?;
511 }
512
513 Ok(())
514 }
515
516 async fn start_health_check(&self) {
518 let interval_secs = self.config.health_check_interval;
519 let base_dir = self.config.base_dir.clone();
520
521 tokio::spawn(async move {
522 let mut interval = interval(Duration::from_secs(interval_secs));
523
524 loop {
525 interval.tick().await;
526
527 if let Ok(Some(state)) = Self::load_state(&base_dir) {
529 if !Self::check_process_alive(state.pid).await {
530 warn!("Node process {} is no longer running", state.pid);
531 let _ = fs::remove_file(base_dir.join(PID_FILE));
533 break;
534 }
535 } else {
536 break;
538 }
539 }
540 });
541 }
542
543 pub async fn rotate_logs(&self) -> Result<()> {
545 let log_file = self.config.base_dir.join(LOG_FILE);
546
547 if !log_file.exists() {
548 return Ok(());
549 }
550
551 let metadata = fs::metadata(&log_file)?;
552 let size_mb = metadata.len() / (1024 * 1024);
553
554 if size_mb >= self.config.log_rotation_size_mb {
555 info!("Rotating log file (size: {} MB)", size_mb);
556
557 let mut rotation_num = 1;
559 while self
560 .config
561 .base_dir
562 .join(format!("{}.{}", LOG_FILE, rotation_num))
563 .exists()
564 {
565 rotation_num += 1;
566 }
567
568 let rotated_file = self
570 .config
571 .base_dir
572 .join(format!("{}.{}", LOG_FILE, rotation_num));
573 fs::rename(&log_file, &rotated_file)?;
574
575 self.cleanup_old_logs().await?;
577 }
578
579 Ok(())
580 }
581
582 async fn cleanup_old_logs(&self) -> Result<()> {
584 let mut log_files: Vec<(PathBuf, u64)> = Vec::new();
585
586 for entry in fs::read_dir(&self.config.base_dir)? {
588 let entry = entry?;
589 let path = entry.path();
590
591 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
592 if name.starts_with(LOG_FILE) && name != LOG_FILE {
593 let metadata = fs::metadata(&path)?;
594 let modified = metadata.modified()?.duration_since(UNIX_EPOCH)?.as_secs();
595 log_files.push((path, modified));
596 }
597 }
598 }
599
600 log_files.sort_by_key(|&(_, time)| time);
602
603 while log_files.len() > self.config.max_log_files {
605 if let Some((path, _)) = log_files.first() {
606 info!("Removing old log file: {:?}", path);
607 fs::remove_file(path)?;
608 log_files.remove(0);
609 }
610 }
611
612 Ok(())
613 }
614}
615
616#[derive(Debug, Clone, Serialize, Deserialize)]
618pub struct NodeStatus {
619 pub is_running: bool,
620 pub pid: Option<u32>,
621 pub port: u16,
622 pub data_dir: PathBuf,
623 pub log_file: PathBuf,
624 pub uptime_seconds: Option<u64>,
625 pub last_health_check: Option<u64>,
626}
627
628#[cfg(test)]
629mod tests {
630 use super::*;
631 use tempfile::TempDir;
632
633 #[tokio::test]
634 async fn test_node_manager_creation() {
635 let temp_dir = TempDir::new().unwrap();
636 let config = NodeManagerConfig {
637 base_dir: temp_dir.path().to_path_buf(),
638 ..Default::default()
639 };
640
641 let manager = NodeManager::new(config).unwrap();
642 assert!(!manager.is_running().await);
643 }
644
645 #[test]
646 fn test_process_alive_check() {
647 let current_pid = std::process::id();
649 assert!(NodeManager::check_process_alive_sync(current_pid));
650
651 assert!(!NodeManager::check_process_alive_sync(999999));
653 }
654}