casper_devnet/
process.rs

1use crate::assets::{AssetsLayout, BOOTSTRAP_NODES};
2use crate::node_launcher::Launcher;
3use crate::state::{ProcessGroup, ProcessKind, ProcessRecord, ProcessStatus, State};
4use anyhow::{anyhow, Result};
5use nix::errno::Errno;
6use nix::sys::signal::{kill, Signal};
7use nix::unistd::Pid;
8use std::collections::BTreeMap;
9use std::path::Path;
10use std::sync::{
11    atomic::{AtomicBool, AtomicU32, Ordering},
12    Arc,
13};
14use std::time::{Duration, Instant};
15use time::OffsetDateTime;
16use tokio::fs as tokio_fs;
17use tokio::fs::OpenOptions;
18use tokio::process::Command;
19use tokio::time::sleep;
20
21/// A started process plus its handle for lifecycle tracking.
22pub struct RunningProcess {
23    pub record: ProcessRecord,
24    pub handle: ProcessHandle,
25}
26
27/// Handle types for started processes.
28pub enum ProcessHandle {
29    Child(tokio::process::Child),
30    Task(tokio::task::JoinHandle<Result<()>>),
31}
32
33/// Start parameters derived from CLI arguments.
34pub struct StartPlan {
35    pub rust_log: String,
36}
37
38/// Start all nodes (and sidecars if available), updating state on success.
39pub async fn start(
40    layout: &AssetsLayout,
41    plan: &StartPlan,
42    state: &mut State,
43) -> Result<Vec<RunningProcess>> {
44    let total_nodes = layout.count_nodes().await?;
45    if total_nodes == 0 {
46        return Err(anyhow!(
47            "no nodes found under {}",
48            layout.nodes_dir().display()
49        ));
50    }
51    let node_ids: Vec<u32> = (1..=total_nodes).collect();
52
53    let mut started = Vec::new();
54    for node_id in node_ids {
55        if node_id > total_nodes {
56            return Err(anyhow!(
57                "node {} exceeds total nodes {}",
58                node_id,
59                total_nodes
60            ));
61        }
62        let mut records = start_node(layout, node_id, total_nodes, &plan.rust_log).await?;
63        started.append(&mut records);
64    }
65
66    state.processes = started.iter().map(|proc| proc.record.clone()).collect();
67    state.touch().await?;
68
69    Ok(started)
70}
71
72/// Stop all running processes tracked in state.
73pub async fn stop(state: &mut State) -> Result<()> {
74    let mut running: Vec<&mut ProcessRecord> = state
75        .processes
76        .iter_mut()
77        .filter(|record| matches!(record.last_status, ProcessStatus::Running))
78        .collect();
79
80    let mut errors = Vec::new();
81
82    for record in &mut running {
83        if let Some(shutdown) = &record.shutdown_handle {
84            shutdown.store(true, Ordering::SeqCst);
85        }
86
87        if let Some(pid) = current_pid(record) {
88            println!(
89                "sending {} to {} (pid {})",
90                signal_name(Signal::SIGTERM),
91                record.id,
92                pid
93            );
94            if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
95                errors.push(format!(
96                    "failed to send {} to {} (pid {}): {}",
97                    signal_name(Signal::SIGTERM),
98                    record.id,
99                    pid,
100                    err
101                ));
102            }
103        }
104    }
105
106    let deadline = Instant::now() + Duration::from_secs(5);
107    for record in &mut running {
108        if let Some(pid) = current_pid(record) {
109            while process_alive(pid as i32) && Instant::now() < deadline {
110                sleep(Duration::from_millis(200)).await;
111            }
112            if process_alive(pid as i32) {
113                println!(
114                    "sending {} to {} (pid {})",
115                    signal_name(Signal::SIGKILL),
116                    record.id,
117                    pid
118                );
119                if let Err(err) = send_signal(pid as i32, Signal::SIGKILL) {
120                    errors.push(format!(
121                        "failed to send {} to {} (pid {}): {}",
122                        signal_name(Signal::SIGKILL),
123                        record.id,
124                        pid,
125                        err
126                    ));
127                }
128                if process_alive(pid as i32) {
129                    errors.push(format!(
130                        "{} (pid {}) still running after SIGKILL",
131                        record.id, pid
132                    ));
133                    record.last_status = ProcessStatus::Unknown;
134                    continue;
135                }
136            }
137            record.last_status = ProcessStatus::Stopped;
138            record.stopped_at = Some(OffsetDateTime::now_utc());
139            record.exit_code = None;
140            record.exit_signal = None;
141            continue;
142        }
143        errors.push(format!("missing pid for {} while stopping", record.id));
144        record.last_status = ProcessStatus::Unknown;
145    }
146
147    state.touch().await?;
148    if errors.is_empty() {
149        Ok(())
150    } else {
151        Err(anyhow!(errors.join("\n")))
152    }
153}
154
155/// Start a single node and optional sidecar.
156async fn start_node(
157    layout: &AssetsLayout,
158    node_id: u32,
159    total_nodes: u32,
160    rust_log: &str,
161) -> Result<Vec<RunningProcess>> {
162    let mut records = Vec::new();
163
164    let node_record = spawn_node(layout, node_id, total_nodes, rust_log).await?;
165    records.push(node_record);
166
167    if let Some(sidecar_record) = spawn_sidecar(layout, node_id, total_nodes, rust_log).await? {
168        records.push(sidecar_record);
169    }
170
171    Ok(records)
172}
173
174/// Spawn the embedded launcher to run a node process in this runtime.
175async fn spawn_node(
176    layout: &AssetsLayout,
177    node_id: u32,
178    total_nodes: u32,
179    rust_log: &str,
180) -> Result<RunningProcess> {
181    let node_dir = layout.node_dir(node_id);
182
183    let stdout_path = layout.node_logs_dir(node_id).join("stdout.log");
184    let stderr_path = layout.node_logs_dir(node_id).join("stderr.log");
185
186    let mut launcher = Launcher::new_with_roots(
187        None,
188        layout.node_bin_dir(node_id),
189        layout.node_config_root(node_id),
190    )
191    .await?;
192    launcher.set_log_paths(stdout_path.clone(), stderr_path.clone());
193    launcher.set_cwd(node_dir.clone());
194    launcher.set_rust_log(rust_log.to_string());
195
196    let mut env = BTreeMap::new();
197    env.insert(
198        "CASPER_CONFIG_DIR".to_string(),
199        layout
200            .node_config_root(node_id)
201            .to_string_lossy()
202            .to_string(),
203    );
204    launcher.set_envs(env);
205
206    let (command_path, command_args) = launcher.current_command();
207    let child_pid = launcher.child_pid();
208    let shutdown = Arc::new(AtomicBool::new(false));
209    let shutdown_thread = Arc::clone(&shutdown);
210    let handle =
211        tokio::spawn(async move { launcher.run_with_shutdown(shutdown_thread.as_ref()).await });
212    let pid = wait_for_pid(child_pid.as_ref()).await;
213
214    Ok(RunningProcess {
215        record: ProcessRecord {
216            id: format!("node-{}", node_id),
217            node_id,
218            kind: ProcessKind::Node,
219            group: process_group(node_id, total_nodes),
220            command: command_path.to_string_lossy().to_string(),
221            args: command_args,
222            cwd: node_dir.to_string_lossy().to_string(),
223            pid,
224            pid_handle: Some(child_pid),
225            shutdown_handle: Some(shutdown),
226            stdout_path: stdout_path.to_string_lossy().to_string(),
227            stderr_path: stderr_path.to_string_lossy().to_string(),
228            started_at: Some(OffsetDateTime::now_utc()),
229            stopped_at: None,
230            exit_code: None,
231            exit_signal: None,
232            last_status: ProcessStatus::Running,
233        },
234        handle: ProcessHandle::Task(handle),
235    })
236}
237
238/// Spawn the sidecar process if binary and config are available.
239async fn spawn_sidecar(
240    layout: &AssetsLayout,
241    node_id: u32,
242    total_nodes: u32,
243    rust_log: &str,
244) -> Result<Option<RunningProcess>> {
245    let version_dir = layout.latest_protocol_version_dir(node_id).await?;
246    let command_path = layout
247        .node_bin_dir(node_id)
248        .join(&version_dir)
249        .join("casper-sidecar");
250    let config_path = layout
251        .node_config_root(node_id)
252        .join(&version_dir)
253        .join("sidecar.toml");
254
255    if !is_file(&command_path).await || !is_file(&config_path).await {
256        return Ok(None);
257    }
258
259    let node_dir = layout.node_dir(node_id);
260    let stdout_path = layout.node_logs_dir(node_id).join("sidecar-stdout.log");
261    let stderr_path = layout.node_logs_dir(node_id).join("sidecar-stderr.log");
262
263    let args = vec![
264        "--path-to-config".to_string(),
265        config_path.to_string_lossy().to_string(),
266    ];
267
268    let mut env = BTreeMap::new();
269    env.insert("RUST_LOG".to_string(), rust_log.to_string());
270
271    let child = spawn_process(
272        &command_path,
273        &args,
274        &env,
275        &node_dir,
276        &stdout_path,
277        &stderr_path,
278    )
279    .await?;
280    let pid = child.id();
281
282    Ok(Some(RunningProcess {
283        record: ProcessRecord {
284            id: format!("sidecar-{}", node_id),
285            node_id,
286            kind: ProcessKind::Sidecar,
287            group: process_group(node_id, total_nodes),
288            command: command_path.to_string_lossy().to_string(),
289            args,
290            cwd: node_dir.to_string_lossy().to_string(),
291            pid,
292            pid_handle: None,
293            shutdown_handle: None,
294            stdout_path: stdout_path.to_string_lossy().to_string(),
295            stderr_path: stderr_path.to_string_lossy().to_string(),
296            started_at: Some(OffsetDateTime::now_utc()),
297            stopped_at: None,
298            exit_code: None,
299            exit_signal: None,
300            last_status: ProcessStatus::Running,
301        },
302        handle: ProcessHandle::Child(child),
303    }))
304}
305
306/// Spawn a tokio-managed child process with redirected logs.
307async fn spawn_process(
308    command: &Path,
309    args: &[String],
310    env: &BTreeMap<String, String>,
311    cwd: &Path,
312    stdout_path: &Path,
313    stderr_path: &Path,
314) -> Result<tokio::process::Child> {
315    let stdout = OpenOptions::new()
316        .create(true)
317        .append(true)
318        .open(stdout_path)
319        .await?;
320    let stderr = OpenOptions::new()
321        .create(true)
322        .append(true)
323        .open(stderr_path)
324        .await?;
325    let stdout = stdout.into_std().await;
326    let stderr = stderr.into_std().await;
327
328    let mut cmd = Command::new(command);
329    cmd.args(args)
330        .envs(env)
331        .current_dir(cwd)
332        .stdout(stdout)
333        .stderr(stderr);
334
335    Ok(cmd.spawn()?)
336}
337
338fn process_group(node_id: u32, total_nodes: u32) -> ProcessGroup {
339    let genesis_nodes = total_nodes / 2;
340    if node_id <= BOOTSTRAP_NODES.min(genesis_nodes) {
341        ProcessGroup::Validators1
342    } else if node_id <= genesis_nodes {
343        ProcessGroup::Validators2
344    } else {
345        ProcessGroup::Validators3
346    }
347}
348
349fn process_alive(pid: i32) -> bool {
350    match kill(Pid::from_raw(pid), None) {
351        Ok(()) => true,
352        Err(Errno::ESRCH) => false,
353        Err(_) => true,
354    }
355}
356
357fn send_signal(target: i32, signal: Signal) -> Result<()> {
358    match kill(Pid::from_raw(target), signal) {
359        Ok(()) => Ok(()),
360        Err(Errno::ESRCH) => Ok(()),
361        Err(err) => Err(anyhow!(err)),
362    }
363}
364
365fn signal_name(signal: Signal) -> &'static str {
366    match signal {
367        Signal::SIGTERM => "SIGTERM",
368        Signal::SIGKILL => "SIGKILL",
369        _ => "signal",
370    }
371}
372
373async fn is_file(path: &Path) -> bool {
374    tokio_fs::metadata(path)
375        .await
376        .map(|meta| meta.is_file())
377        .unwrap_or(false)
378}
379
380async fn wait_for_pid(pid_handle: &AtomicU32) -> Option<u32> {
381    let deadline = Instant::now() + Duration::from_secs(1);
382    loop {
383        let pid = pid_handle.load(Ordering::SeqCst);
384        if pid != 0 {
385            return Some(pid);
386        }
387        if Instant::now() >= deadline {
388            return None;
389        }
390        sleep(Duration::from_millis(20)).await;
391    }
392}
393
394fn current_pid(record: &ProcessRecord) -> Option<u32> {
395    if let Some(handle) = &record.pid_handle {
396        let pid = handle.load(Ordering::SeqCst);
397        if pid != 0 {
398            return Some(pid);
399        }
400    }
401    record.pid
402}