casper_devnet/
process.rs

1use crate::assets::{AssetsLayout, BOOTSTRAP_NODES};
2use crate::node_launcher::Launcher;
3use crate::state::{ProcessGroup, ProcessKind, ProcessRecord, ProcessStatus, State};
4use anyhow::{Result, anyhow};
5use nix::errno::Errno;
6use nix::sys::signal::{Signal, kill};
7use nix::unistd::Pid;
8use std::collections::BTreeMap;
9use std::path::Path;
10use std::sync::{
11    Arc,
12    atomic::{AtomicBool, AtomicU32, Ordering},
13};
14use std::time::{Duration, Instant};
15use time::OffsetDateTime;
16use tokio::fs as tokio_fs;
17use tokio::fs::OpenOptions;
18use tokio::process::Command;
19use tokio::time::sleep;
20
21/// A started process plus its handle for lifecycle tracking.
22pub struct RunningProcess {
23    pub record: ProcessRecord,
24    pub handle: ProcessHandle,
25}
26
27/// Handle types for started processes.
28pub enum ProcessHandle {
29    Child(tokio::process::Child),
30    Task(tokio::task::JoinHandle<Result<()>>),
31}
32
33/// Start parameters derived from CLI arguments.
34pub struct StartPlan {
35    pub rust_log: String,
36}
37
38/// Start all nodes (and sidecars if available), updating state on success.
39pub async fn start(
40    layout: &AssetsLayout,
41    plan: &StartPlan,
42    state: &mut State,
43) -> Result<Vec<RunningProcess>> {
44    let total_nodes = layout.count_nodes().await?;
45    if total_nodes == 0 {
46        return Err(anyhow!(
47            "no nodes found under {}",
48            layout.nodes_dir().display()
49        ));
50    }
51    let node_ids: Vec<u32> = (1..=total_nodes).collect();
52
53    let mut started = Vec::new();
54    for node_id in node_ids {
55        if node_id > total_nodes {
56            return Err(anyhow!(
57                "node {} exceeds total nodes {}",
58                node_id,
59                total_nodes
60            ));
61        }
62        let mut records = start_node(layout, node_id, total_nodes, &plan.rust_log).await?;
63        started.append(&mut records);
64    }
65
66    state.processes = started.iter().map(|proc| proc.record.clone()).collect();
67    state.touch().await?;
68
69    Ok(started)
70}
71
72/// Stop all running processes tracked in state.
73pub async fn stop(state: &mut State) -> Result<()> {
74    let mut running: Vec<&mut ProcessRecord> = state
75        .processes
76        .iter_mut()
77        .filter(|record| matches!(record.last_status, ProcessStatus::Running))
78        .collect();
79
80    let mut errors = Vec::new();
81
82    for record in &mut running {
83        if let Some(shutdown) = &record.shutdown_handle {
84            shutdown.store(true, Ordering::SeqCst);
85        }
86
87        if let Some(pid) = current_pid(record) {
88            println!(
89                "sending {} to {} (pid {})",
90                signal_name(Signal::SIGTERM),
91                record.id,
92                pid
93            );
94            if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
95                errors.push(format!(
96                    "failed to send {} to {} (pid {}): {}",
97                    signal_name(Signal::SIGTERM),
98                    record.id,
99                    pid,
100                    err
101                ));
102            }
103        }
104    }
105
106    let deadline = Instant::now() + Duration::from_secs(5);
107    for record in &mut running {
108        if let Some(pid) = current_pid(record) {
109            while process_alive(pid as i32) && Instant::now() < deadline {
110                sleep(Duration::from_millis(200)).await;
111            }
112            if process_alive(pid as i32) {
113                println!(
114                    "sending {} to {} (pid {})",
115                    signal_name(Signal::SIGKILL),
116                    record.id,
117                    pid
118                );
119                if let Err(err) = send_signal(pid as i32, Signal::SIGKILL) {
120                    errors.push(format!(
121                        "failed to send {} to {} (pid {}): {}",
122                        signal_name(Signal::SIGKILL),
123                        record.id,
124                        pid,
125                        err
126                    ));
127                }
128                if process_alive(pid as i32) {
129                    errors.push(format!(
130                        "{} (pid {}) still running after SIGKILL",
131                        record.id, pid
132                    ));
133                    record.last_status = ProcessStatus::Unknown;
134                    continue;
135                }
136            }
137            record.last_status = ProcessStatus::Stopped;
138            record.stopped_at = Some(OffsetDateTime::now_utc());
139            record.exit_code = None;
140            record.exit_signal = None;
141            continue;
142        }
143        errors.push(format!("missing pid for {} while stopping", record.id));
144        record.last_status = ProcessStatus::Unknown;
145    }
146
147    state.touch().await?;
148    if errors.is_empty() {
149        Ok(())
150    } else {
151        Err(anyhow!(errors.join("\n")))
152    }
153}
154
155/// Start a single node and optional sidecar.
156async fn start_node(
157    layout: &AssetsLayout,
158    node_id: u32,
159    total_nodes: u32,
160    rust_log: &str,
161) -> Result<Vec<RunningProcess>> {
162    let mut records = Vec::new();
163
164    let node_record = spawn_node(layout, node_id, total_nodes, rust_log).await?;
165    records.push(node_record);
166
167    if let Some(sidecar_record) = spawn_sidecar(layout, node_id, total_nodes, rust_log).await? {
168        records.push(sidecar_record);
169    }
170
171    Ok(records)
172}
173
174/// Spawn the embedded launcher to run a node process in this runtime.
175async fn spawn_node(
176    layout: &AssetsLayout,
177    node_id: u32,
178    total_nodes: u32,
179    rust_log: &str,
180) -> Result<RunningProcess> {
181    let node_dir = layout.node_dir(node_id);
182
183    let stdout_path = layout.node_logs_dir(node_id).join("stdout.log");
184    let stderr_path = layout.node_logs_dir(node_id).join("stderr.log");
185    create_log_symlinks(
186        layout,
187        node_id,
188        ProcessKind::Node,
189        &stdout_path,
190        &stderr_path,
191    )
192    .await?;
193
194    let mut launcher = Launcher::new_with_roots(
195        None,
196        layout.node_bin_dir(node_id),
197        layout.node_config_root(node_id),
198    )
199    .await?;
200    launcher.set_log_paths(stdout_path.clone(), stderr_path.clone());
201    launcher.set_cwd(node_dir.clone());
202    launcher.set_rust_log(rust_log.to_string());
203
204    let mut env = BTreeMap::new();
205    env.insert(
206        "CASPER_CONFIG_DIR".to_string(),
207        layout
208            .node_config_root(node_id)
209            .to_string_lossy()
210            .to_string(),
211    );
212    launcher.set_envs(env);
213
214    let (command_path, command_args) = launcher.current_command();
215    let child_pid = launcher.child_pid();
216    let shutdown = Arc::new(AtomicBool::new(false));
217    let shutdown_thread = Arc::clone(&shutdown);
218    let handle =
219        tokio::spawn(async move { launcher.run_with_shutdown(shutdown_thread.as_ref()).await });
220    let pid = wait_for_pid(child_pid.as_ref()).await;
221
222    Ok(RunningProcess {
223        record: ProcessRecord {
224            id: format!("node-{}", node_id),
225            node_id,
226            kind: ProcessKind::Node,
227            group: process_group(node_id, total_nodes),
228            command: command_path.to_string_lossy().to_string(),
229            args: command_args,
230            cwd: node_dir.to_string_lossy().to_string(),
231            pid,
232            pid_handle: Some(child_pid),
233            shutdown_handle: Some(shutdown),
234            stdout_path: stdout_path.to_string_lossy().to_string(),
235            stderr_path: stderr_path.to_string_lossy().to_string(),
236            started_at: Some(OffsetDateTime::now_utc()),
237            stopped_at: None,
238            exit_code: None,
239            exit_signal: None,
240            last_status: ProcessStatus::Running,
241        },
242        handle: ProcessHandle::Task(handle),
243    })
244}
245
246/// Spawn the sidecar process if binary and config are available.
247async fn spawn_sidecar(
248    layout: &AssetsLayout,
249    node_id: u32,
250    total_nodes: u32,
251    rust_log: &str,
252) -> Result<Option<RunningProcess>> {
253    let version_dir = layout.latest_protocol_version_dir(node_id).await?;
254    let command_path = layout
255        .node_bin_dir(node_id)
256        .join(&version_dir)
257        .join("casper-sidecar");
258    let config_path = layout
259        .node_config_root(node_id)
260        .join(&version_dir)
261        .join("sidecar.toml");
262
263    if !is_file(&command_path).await || !is_file(&config_path).await {
264        return Ok(None);
265    }
266
267    let node_dir = layout.node_dir(node_id);
268    let stdout_path = layout.node_logs_dir(node_id).join("sidecar-stdout.log");
269    let stderr_path = layout.node_logs_dir(node_id).join("sidecar-stderr.log");
270    create_log_symlinks(
271        layout,
272        node_id,
273        ProcessKind::Sidecar,
274        &stdout_path,
275        &stderr_path,
276    )
277    .await?;
278
279    let args = vec![
280        "--path-to-config".to_string(),
281        config_path.to_string_lossy().to_string(),
282    ];
283
284    let mut env = BTreeMap::new();
285    env.insert("RUST_LOG".to_string(), rust_log.to_string());
286
287    let child = spawn_process(
288        &command_path,
289        &args,
290        &env,
291        &node_dir,
292        &stdout_path,
293        &stderr_path,
294    )
295    .await?;
296    let pid = child.id();
297
298    Ok(Some(RunningProcess {
299        record: ProcessRecord {
300            id: format!("sidecar-{}", node_id),
301            node_id,
302            kind: ProcessKind::Sidecar,
303            group: process_group(node_id, total_nodes),
304            command: command_path.to_string_lossy().to_string(),
305            args,
306            cwd: node_dir.to_string_lossy().to_string(),
307            pid,
308            pid_handle: None,
309            shutdown_handle: None,
310            stdout_path: stdout_path.to_string_lossy().to_string(),
311            stderr_path: stderr_path.to_string_lossy().to_string(),
312            started_at: Some(OffsetDateTime::now_utc()),
313            stopped_at: None,
314            exit_code: None,
315            exit_signal: None,
316            last_status: ProcessStatus::Running,
317        },
318        handle: ProcessHandle::Child(child),
319    }))
320}
321
322/// Spawn a tokio-managed child process with redirected logs.
323async fn spawn_process(
324    command: &Path,
325    args: &[String],
326    env: &BTreeMap<String, String>,
327    cwd: &Path,
328    stdout_path: &Path,
329    stderr_path: &Path,
330) -> Result<tokio::process::Child> {
331    let stdout = OpenOptions::new()
332        .create(true)
333        .append(true)
334        .open(stdout_path)
335        .await?;
336    let stderr = OpenOptions::new()
337        .create(true)
338        .append(true)
339        .open(stderr_path)
340        .await?;
341    let stdout = stdout.into_std().await;
342    let stderr = stderr.into_std().await;
343
344    let mut cmd = Command::new(command);
345    cmd.args(args)
346        .envs(env)
347        .current_dir(cwd)
348        .stdout(stdout)
349        .stderr(stderr);
350
351    Ok(cmd.spawn()?)
352}
353
354async fn create_log_symlinks(
355    layout: &AssetsLayout,
356    node_id: u32,
357    kind: ProcessKind,
358    stdout_path: &Path,
359    stderr_path: &Path,
360) -> Result<()> {
361    let data_dir = layout.net_dir();
362    tokio_fs::create_dir_all(&data_dir).await?;
363    let prefix = match kind {
364        ProcessKind::Node => format!("node-{}", node_id),
365        ProcessKind::Sidecar => format!("sidecar-{}", node_id),
366    };
367    let stdout_link = data_dir.join(format!("{}.stdout", prefix));
368    let stderr_link = data_dir.join(format!("{}.stderr", prefix));
369    create_symlink(&stdout_link, stdout_path).await?;
370    create_symlink(&stderr_link, stderr_path).await?;
371    Ok(())
372}
373
374async fn create_symlink(link_path: &Path, target_path: &Path) -> Result<()> {
375    if let Ok(metadata) = tokio_fs::symlink_metadata(link_path).await {
376        if metadata.is_dir() {
377            tokio_fs::remove_dir_all(link_path).await?;
378        } else {
379            tokio_fs::remove_file(link_path).await?;
380        }
381    }
382    tokio_fs::symlink(target_path, link_path).await?;
383    Ok(())
384}
385
386fn process_group(node_id: u32, total_nodes: u32) -> ProcessGroup {
387    let genesis_nodes = total_nodes / 2;
388    if node_id <= BOOTSTRAP_NODES.min(genesis_nodes) {
389        ProcessGroup::Validators1
390    } else if node_id <= genesis_nodes {
391        ProcessGroup::Validators2
392    } else {
393        ProcessGroup::Validators3
394    }
395}
396
397fn process_alive(pid: i32) -> bool {
398    match kill(Pid::from_raw(pid), None) {
399        Ok(()) => true,
400        Err(Errno::ESRCH) => false,
401        Err(_) => true,
402    }
403}
404
405fn send_signal(target: i32, signal: Signal) -> Result<()> {
406    match kill(Pid::from_raw(target), signal) {
407        Ok(()) => Ok(()),
408        Err(Errno::ESRCH) => Ok(()),
409        Err(err) => Err(anyhow!(err)),
410    }
411}
412
413fn signal_name(signal: Signal) -> &'static str {
414    match signal {
415        Signal::SIGTERM => "SIGTERM",
416        Signal::SIGKILL => "SIGKILL",
417        _ => "signal",
418    }
419}
420
421async fn is_file(path: &Path) -> bool {
422    tokio_fs::metadata(path)
423        .await
424        .map(|meta| meta.is_file())
425        .unwrap_or(false)
426}
427
428async fn wait_for_pid(pid_handle: &AtomicU32) -> Option<u32> {
429    let deadline = Instant::now() + Duration::from_secs(1);
430    loop {
431        let pid = pid_handle.load(Ordering::SeqCst);
432        if pid != 0 {
433            return Some(pid);
434        }
435        if Instant::now() >= deadline {
436            return None;
437        }
438        sleep(Duration::from_millis(20)).await;
439    }
440}
441
442fn current_pid(record: &ProcessRecord) -> Option<u32> {
443    if let Some(handle) = &record.pid_handle {
444        let pid = handle.load(Ordering::SeqCst);
445        if pid != 0 {
446            return Some(pid);
447        }
448    }
449    record.pid
450}