qudag_cli/
node_manager.rs

1use crate::config::NodeConfigManager;
2use anyhow::{anyhow, Result};
3use serde::{Deserialize, Serialize};
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::process::Stdio;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8use tokio::process::{Child, Command};
9use tokio::signal;
10use tokio::sync::{Mutex, RwLock};
11use tokio::time::interval;
12use tracing::{error, info, warn};
13
14/// Process ID file location
15const PID_FILE: &str = "qudag.pid";
16const LOG_FILE: &str = "qudag.log";
17const CONFIG_FILE: &str = "config.toml";
18
19/// Node state information
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct NodeState {
22    pub pid: u32,
23    pub started_at: u64,
24    pub port: u16,
25    pub data_dir: PathBuf,
26    pub log_file: PathBuf,
27    pub config_file: PathBuf,
28}
29
30/// Node manager configuration
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct NodeManagerConfig {
33    /// Base directory for node data
34    pub base_dir: PathBuf,
35    /// Default network port
36    pub default_port: u16,
37    /// Log rotation size in MB
38    pub log_rotation_size_mb: u64,
39    /// Max log files to keep
40    pub max_log_files: usize,
41    /// Health check interval in seconds
42    pub health_check_interval: u64,
43    /// Graceful shutdown timeout in seconds
44    pub shutdown_timeout: u64,
45}
46
47impl Default for NodeManagerConfig {
48    fn default() -> Self {
49        let home_dir = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
50        let base_dir = home_dir.join(".qudag");
51
52        Self {
53            base_dir,
54            default_port: 8000,
55            log_rotation_size_mb: 100,
56            max_log_files: 5,
57            health_check_interval: 60,
58            shutdown_timeout: 30,
59        }
60    }
61}
62
63/// Node process manager
64pub struct NodeManager {
65    config: NodeManagerConfig,
66    state: RwLock<Option<NodeState>>,
67    process: Mutex<Option<Child>>,
68    config_manager: NodeConfigManager,
69}
70
71impl NodeManager {
72    /// Create a new node manager
73    pub fn new(config: NodeManagerConfig) -> Result<Self> {
74        // Create base directory if it doesn't exist
75        fs::create_dir_all(&config.base_dir)?;
76
77        // Initialize config manager
78        let config_manager = NodeConfigManager::new(config.base_dir.join(CONFIG_FILE))?;
79
80        // Load existing state if available
81        let state = Self::load_state(&config.base_dir)?;
82
83        Ok(Self {
84            config,
85            state: RwLock::new(state),
86            process: Mutex::new(None),
87            config_manager,
88        })
89    }
90
91    /// Check if a node is currently running
92    pub async fn is_running(&self) -> bool {
93        if let Some(state) = self.state.read().await.as_ref() {
94            // Check if process is still alive
95            Self::check_process_alive(state.pid).await
96        } else {
97            false
98        }
99    }
100
101    /// Start the node process
102    pub async fn start_node(
103        &self,
104        port: Option<u16>,
105        data_dir: Option<PathBuf>,
106        peers: Vec<String>,
107        foreground: bool,
108    ) -> Result<()> {
109        // Check if already running
110        if self.is_running().await {
111            return Err(anyhow!("Node is already running"));
112        }
113
114        let port = port.unwrap_or(self.config.default_port);
115        let data_dir = data_dir.unwrap_or_else(|| self.config.base_dir.join("data"));
116
117        // Create data directory
118        fs::create_dir_all(&data_dir)?;
119
120        // Update configuration
121        self.config_manager
122            .update_config(|config| {
123                config.network_port = port;
124                config.data_dir = data_dir.clone();
125                if !peers.is_empty() {
126                    config.initial_peers = peers.clone();
127                }
128                Ok(())
129            })
130            .await?;
131
132        // Prepare log file
133        let log_file = self.config.base_dir.join(LOG_FILE);
134        let log_file_handle = std::fs::OpenOptions::new()
135            .create(true)
136            .append(true)
137            .open(&log_file)?;
138
139        info!(
140            "Starting QuDAG node on port {} with data dir {:?}",
141            port, data_dir
142        );
143
144        // Build command
145        let mut cmd = Command::new(std::env::current_exe()?);
146        cmd.arg("run-node")
147            .arg("--port")
148            .arg(port.to_string())
149            .arg("--data-dir")
150            .arg(data_dir.display().to_string());
151
152        for peer in &peers {
153            cmd.arg("--peer").arg(peer);
154        }
155
156        // Configure process
157        if !foreground {
158            cmd.stdin(Stdio::null())
159                .stdout(Stdio::from(log_file_handle.try_clone()?))
160                .stderr(Stdio::from(log_file_handle));
161        }
162
163        // Spawn process
164        let mut child = cmd.spawn()?;
165        let pid = child
166            .id()
167            .ok_or_else(|| anyhow!("Failed to get process ID"))?;
168
169        // Create node state
170        let state = NodeState {
171            pid,
172            started_at: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
173            port,
174            data_dir,
175            log_file: log_file.clone(),
176            config_file: self.config.base_dir.join(CONFIG_FILE),
177        };
178
179        // Save state
180        Self::save_state(&self.config.base_dir, &state)?;
181        *self.state.write().await = Some(state.clone());
182
183        if foreground {
184            // Run in foreground
185            info!("Running node in foreground mode");
186
187            // Set up signal handlers
188            let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?;
189            let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())?;
190
191            tokio::select! {
192                _ = sigterm.recv() => {
193                    info!("Received SIGTERM, shutting down...");
194                }
195                _ = sigint.recv() => {
196                    info!("Received SIGINT, shutting down...");
197                }
198                status = child.wait() => {
199                    match status {
200                        Ok(status) => {
201                            if status.success() {
202                                info!("Node process exited successfully");
203                            } else {
204                                error!("Node process exited with status: {}", status);
205                            }
206                        }
207                        Err(e) => {
208                            error!("Failed to wait for node process: {}", e);
209                        }
210                    }
211                }
212            }
213
214            // Clean up
215            self.cleanup_state().await?;
216        } else {
217            // Store process handle for background mode
218            *self.process.lock().await = Some(child);
219
220            // Start health check task
221            self.start_health_check().await;
222
223            info!("Node started successfully in background (PID: {})", pid);
224            info!("Log file: {:?}", log_file);
225        }
226
227        Ok(())
228    }
229
230    /// Stop the running node
231    pub async fn stop_node(&self, force: bool) -> Result<()> {
232        let state = match self.state.read().await.as_ref() {
233            Some(s) => s.clone(),
234            None => return Err(anyhow!("No node is currently running")),
235        };
236
237        info!("Stopping node (PID: {})", state.pid);
238
239        // Try graceful shutdown first
240        if !force {
241            if let Err(e) = self.graceful_shutdown(&state).await {
242                warn!("Graceful shutdown failed: {}, attempting force kill", e);
243                self.force_kill(&state).await?;
244            }
245        } else {
246            self.force_kill(&state).await?;
247        }
248
249        // Clean up state
250        self.cleanup_state().await?;
251
252        info!("Node stopped successfully");
253        Ok(())
254    }
255
256    /// Restart the node
257    pub async fn restart_node(&self, force: bool) -> Result<()> {
258        info!("Restarting node...");
259
260        // Get current configuration
261        let (port, data_dir, peers) = if let Some(state) = self.state.read().await.as_ref() {
262            let config = self.config_manager.load_config().await?;
263            (
264                Some(state.port),
265                Some(state.data_dir.clone()),
266                config.initial_peers,
267            )
268        } else {
269            return Err(anyhow!("No node is currently running"));
270        };
271
272        // Stop the node
273        self.stop_node(force).await?;
274
275        // Wait a moment for cleanup
276        tokio::time::sleep(Duration::from_secs(2)).await;
277
278        // Start the node with same configuration
279        self.start_node(port, data_dir, peers, false).await?;
280
281        Ok(())
282    }
283
284    /// Get node status
285    pub async fn get_status(&self) -> Result<NodeStatus> {
286        if let Some(state) = self.state.read().await.as_ref() {
287            let is_running = Self::check_process_alive(state.pid).await;
288            let uptime = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() - state.started_at;
289
290            Ok(NodeStatus {
291                is_running,
292                pid: Some(state.pid),
293                port: state.port,
294                data_dir: state.data_dir.clone(),
295                log_file: state.log_file.clone(),
296                uptime_seconds: if is_running { Some(uptime) } else { None },
297                last_health_check: None, // TODO: Implement health check tracking
298            })
299        } else {
300            Ok(NodeStatus {
301                is_running: false,
302                pid: None,
303                port: self.config.default_port,
304                data_dir: self.config.base_dir.join("data"),
305                log_file: self.config.base_dir.join(LOG_FILE),
306                uptime_seconds: None,
307                last_health_check: None,
308            })
309        }
310    }
311
312    /// Generate systemd service file
313    pub async fn generate_systemd_service(&self, output_path: Option<PathBuf>) -> Result<String> {
314        let exe_path = std::env::current_exe()?;
315        let config = self.config_manager.load_config().await?;
316
317        let service_content = format!(
318            r#"[Unit]
319Description=QuDAG Protocol Node
320After=network.target
321
322[Service]
323Type=simple
324ExecStart={} start --port {} --data-dir {}
325ExecStop={} stop
326Restart=on-failure
327RestartSec=10
328User={}
329WorkingDirectory={}
330
331# Security settings
332NoNewPrivileges=true
333PrivateTmp=true
334ProtectSystem=strict
335ProtectHome=true
336ReadWritePaths={}
337
338# Resource limits
339LimitNOFILE=65536
340MemoryLimit=2G
341
342[Install]
343WantedBy=multi-user.target
344"#,
345            exe_path.display(),
346            config.network_port,
347            config.data_dir.display(),
348            exe_path.display(),
349            whoami::username(),
350            self.config.base_dir.display(),
351            config.data_dir.display(),
352        );
353
354        if let Some(path) = output_path {
355            fs::write(&path, &service_content)?;
356            info!("Systemd service file written to: {:?}", path);
357        }
358
359        Ok(service_content)
360    }
361
362    /// Tail log file
363    pub async fn tail_logs(&self, lines: usize, follow: bool) -> Result<()> {
364        let log_file = self.config.base_dir.join(LOG_FILE);
365
366        if !log_file.exists() {
367            return Err(anyhow!("Log file not found: {:?}", log_file));
368        }
369
370        if follow {
371            // Use tail -f equivalent
372            let mut cmd = Command::new("tail");
373            cmd.arg("-f")
374                .arg("-n")
375                .arg(lines.to_string())
376                .arg(&log_file);
377
378            let mut child = cmd.spawn()?;
379
380            // Set up signal handler
381            let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())?;
382
383            tokio::select! {
384                _ = sigint.recv() => {
385                    child.kill().await?;
386                }
387                _ = child.wait() => {}
388            }
389        } else {
390            // Read last N lines
391            let content = fs::read_to_string(&log_file)?;
392            let lines_vec: Vec<&str> = content.lines().collect();
393            let start = lines_vec.len().saturating_sub(lines);
394
395            for line in &lines_vec[start..] {
396                println!("{}", line);
397            }
398        }
399
400        Ok(())
401    }
402
403    /// Load node state from disk
404    fn load_state(base_dir: &Path) -> Result<Option<NodeState>> {
405        let pid_file = base_dir.join(PID_FILE);
406
407        if !pid_file.exists() {
408            return Ok(None);
409        }
410
411        let content = fs::read_to_string(&pid_file)?;
412        let state: NodeState = serde_json::from_str(&content)?;
413
414        // Verify process is still alive
415        if Self::check_process_alive_sync(state.pid) {
416            Ok(Some(state))
417        } else {
418            // Clean up stale PID file
419            let _ = fs::remove_file(&pid_file);
420            Ok(None)
421        }
422    }
423
424    /// Save node state to disk
425    fn save_state(base_dir: &Path, state: &NodeState) -> Result<()> {
426        let pid_file = base_dir.join(PID_FILE);
427        let content = serde_json::to_string_pretty(state)?;
428        fs::write(&pid_file, content)?;
429        Ok(())
430    }
431
432    /// Check if a process is alive (async)
433    async fn check_process_alive(pid: u32) -> bool {
434        // Use kill -0 to check if process exists
435        match Command::new("kill")
436            .arg("-0")
437            .arg(pid.to_string())
438            .output()
439            .await
440        {
441            Ok(output) => output.status.success(),
442            Err(_) => false,
443        }
444    }
445
446    /// Check if a process is alive (sync)
447    fn check_process_alive_sync(pid: u32) -> bool {
448        // Use nix crate or system call
449        #[allow(unsafe_code)]
450        unsafe {
451            libc::kill(pid as i32, 0) == 0
452        }
453    }
454
455    /// Graceful shutdown
456    async fn graceful_shutdown(&self, state: &NodeState) -> Result<()> {
457        info!("Attempting graceful shutdown of PID {}", state.pid);
458
459        // Send SIGTERM
460        Command::new("kill")
461            .arg("-TERM")
462            .arg(state.pid.to_string())
463            .output()
464            .await?;
465
466        // Wait for process to exit
467        let deadline =
468            tokio::time::Instant::now() + Duration::from_secs(self.config.shutdown_timeout);
469        let mut check_interval = interval(Duration::from_millis(100));
470
471        while tokio::time::Instant::now() < deadline {
472            check_interval.tick().await;
473
474            if !Self::check_process_alive(state.pid).await {
475                info!("Process {} exited gracefully", state.pid);
476                return Ok(());
477            }
478        }
479
480        Err(anyhow!("Process did not exit within timeout"))
481    }
482
483    /// Force kill the process
484    async fn force_kill(&self, state: &NodeState) -> Result<()> {
485        warn!("Force killing process {}", state.pid);
486
487        Command::new("kill")
488            .arg("-KILL")
489            .arg(state.pid.to_string())
490            .output()
491            .await?;
492
493        // Wait a moment for process to die
494        tokio::time::sleep(Duration::from_millis(500)).await;
495
496        if Self::check_process_alive(state.pid).await {
497            return Err(anyhow!("Failed to kill process {}", state.pid));
498        }
499
500        Ok(())
501    }
502
503    /// Clean up state files
504    async fn cleanup_state(&self) -> Result<()> {
505        *self.state.write().await = None;
506        *self.process.lock().await = None;
507
508        let pid_file = self.config.base_dir.join(PID_FILE);
509        if pid_file.exists() {
510            fs::remove_file(&pid_file)?;
511        }
512
513        Ok(())
514    }
515
516    /// Start health check task
517    async fn start_health_check(&self) {
518        let interval_secs = self.config.health_check_interval;
519        let base_dir = self.config.base_dir.clone();
520
521        tokio::spawn(async move {
522            let mut interval = interval(Duration::from_secs(interval_secs));
523
524            loop {
525                interval.tick().await;
526
527                // Load current state
528                if let Ok(Some(state)) = Self::load_state(&base_dir) {
529                    if !Self::check_process_alive(state.pid).await {
530                        warn!("Node process {} is no longer running", state.pid);
531                        // Clean up stale PID file
532                        let _ = fs::remove_file(base_dir.join(PID_FILE));
533                        break;
534                    }
535                } else {
536                    // No state, exit health check
537                    break;
538                }
539            }
540        });
541    }
542
543    /// Rotate log files if needed
544    pub async fn rotate_logs(&self) -> Result<()> {
545        let log_file = self.config.base_dir.join(LOG_FILE);
546
547        if !log_file.exists() {
548            return Ok(());
549        }
550
551        let metadata = fs::metadata(&log_file)?;
552        let size_mb = metadata.len() / (1024 * 1024);
553
554        if size_mb >= self.config.log_rotation_size_mb {
555            info!("Rotating log file (size: {} MB)", size_mb);
556
557            // Find next available rotation number
558            let mut rotation_num = 1;
559            while self
560                .config
561                .base_dir
562                .join(format!("{}.{}", LOG_FILE, rotation_num))
563                .exists()
564            {
565                rotation_num += 1;
566            }
567
568            // Rotate current log
569            let rotated_file = self
570                .config
571                .base_dir
572                .join(format!("{}.{}", LOG_FILE, rotation_num));
573            fs::rename(&log_file, &rotated_file)?;
574
575            // Clean up old logs
576            self.cleanup_old_logs().await?;
577        }
578
579        Ok(())
580    }
581
582    /// Clean up old log files
583    async fn cleanup_old_logs(&self) -> Result<()> {
584        let mut log_files: Vec<(PathBuf, u64)> = Vec::new();
585
586        // Find all rotated log files
587        for entry in fs::read_dir(&self.config.base_dir)? {
588            let entry = entry?;
589            let path = entry.path();
590
591            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
592                if name.starts_with(LOG_FILE) && name != LOG_FILE {
593                    let metadata = fs::metadata(&path)?;
594                    let modified = metadata.modified()?.duration_since(UNIX_EPOCH)?.as_secs();
595                    log_files.push((path, modified));
596                }
597            }
598        }
599
600        // Sort by modification time (oldest first)
601        log_files.sort_by_key(|&(_, time)| time);
602
603        // Remove oldest files if we exceed max_log_files
604        while log_files.len() > self.config.max_log_files {
605            if let Some((path, _)) = log_files.first() {
606                info!("Removing old log file: {:?}", path);
607                fs::remove_file(path)?;
608                log_files.remove(0);
609            }
610        }
611
612        Ok(())
613    }
614}
615
616/// Node status information
617#[derive(Debug, Clone, Serialize, Deserialize)]
618pub struct NodeStatus {
619    pub is_running: bool,
620    pub pid: Option<u32>,
621    pub port: u16,
622    pub data_dir: PathBuf,
623    pub log_file: PathBuf,
624    pub uptime_seconds: Option<u64>,
625    pub last_health_check: Option<u64>,
626}
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631    use tempfile::TempDir;
632
633    #[tokio::test]
634    async fn test_node_manager_creation() {
635        let temp_dir = TempDir::new().unwrap();
636        let config = NodeManagerConfig {
637            base_dir: temp_dir.path().to_path_buf(),
638            ..Default::default()
639        };
640
641        let manager = NodeManager::new(config).unwrap();
642        assert!(!manager.is_running().await);
643    }
644
645    #[test]
646    fn test_process_alive_check() {
647        // Check current process (should be alive)
648        let current_pid = std::process::id();
649        assert!(NodeManager::check_process_alive_sync(current_pid));
650
651        // Check non-existent process
652        assert!(!NodeManager::check_process_alive_sync(999999));
653    }
654}