Skip to main content

casper_devnet/
process.rs

1use crate::assets::{AssetsLayout, BOOTSTRAP_NODES};
2use crate::node_launcher::Launcher;
3use crate::state::{ProcessGroup, ProcessKind, ProcessRecord, ProcessStatus, State};
4use anyhow::{Result, anyhow};
5use nix::errno::Errno;
6use nix::sys::signal::{Signal, kill};
7use nix::unistd::Pid;
8use std::collections::BTreeMap;
9use std::path::{Path, PathBuf};
10use std::sync::{
11    Arc,
12    atomic::{AtomicBool, AtomicU32, Ordering},
13};
14use std::time::{Duration, Instant};
15use time::OffsetDateTime;
16use tokio::fs as tokio_fs;
17use tokio::fs::OpenOptions;
18use tokio::process::Command;
19use tokio::time::sleep;
20
21/// A started process plus its handle for lifecycle tracking.
22pub struct RunningProcess {
23    pub record: ProcessRecord,
24    pub handle: ProcessHandle,
25}
26
27/// Handle types for started processes.
28pub enum ProcessHandle {
29    Child(tokio::process::Child),
30    Task(tokio::task::JoinHandle<Result<()>>),
31}
32
33/// Start parameters derived from CLI arguments.
34pub struct StartPlan {
35    pub rust_log: String,
36}
37
38/// Start all nodes (and sidecars if available), updating state on success.
39pub async fn start(
40    layout: &AssetsLayout,
41    plan: &StartPlan,
42    state: &mut State,
43) -> Result<Vec<RunningProcess>> {
44    let total_nodes = layout.count_nodes().await?;
45    if total_nodes == 0 {
46        return Err(anyhow!(
47            "no nodes found under {}",
48            layout.nodes_dir().display()
49        ));
50    }
51    let node_ids: Vec<u32> = (1..=total_nodes).collect();
52
53    let mut started = Vec::new();
54    for node_id in node_ids {
55        if node_id > total_nodes {
56            let err = anyhow!("node {} exceeds total nodes {}", node_id, total_nodes);
57            return Err(cleanup_started_processes(err, started).await);
58        }
59        match start_node(layout, node_id, total_nodes, &plan.rust_log).await {
60            Ok(mut records) => started.append(&mut records),
61            Err(err) => return Err(cleanup_started_processes(err, started).await),
62        }
63    }
64
65    state.processes = started.iter().map(|proc| proc.record.clone()).collect();
66    if let Err(err) = state.touch().await {
67        state.processes.clear();
68        return Err(cleanup_started_processes(err, started).await);
69    }
70
71    Ok(started)
72}
73
74async fn cleanup_started_processes(
75    err: anyhow::Error,
76    started: Vec<RunningProcess>,
77) -> anyhow::Error {
78    let cleanup_errors = stop_started_processes(started).await;
79    if cleanup_errors.is_empty() {
80        err
81    } else {
82        anyhow!(
83            "{}; failed to clean up partially started processes: {}",
84            err,
85            cleanup_errors.join("; ")
86        )
87    }
88}
89
90async fn stop_started_processes(processes: Vec<RunningProcess>) -> Vec<String> {
91    let mut errors = Vec::new();
92
93    for running in &processes {
94        if let Some(shutdown) = &running.record.shutdown_handle {
95            shutdown.store(true, Ordering::SeqCst);
96        }
97
98        let Some(pid) = current_pid(&running.record) else {
99            continue;
100        };
101        if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
102            errors.push(format!(
103                "failed to send {} to {} (pid {}): {}",
104                signal_name(Signal::SIGTERM),
105                running.record.id,
106                pid,
107                err
108            ));
109        }
110    }
111
112    let deadline = Instant::now() + Duration::from_secs(5);
113    for running in &processes {
114        let Some(pid) = current_pid(&running.record) else {
115            continue;
116        };
117        while process_alive(pid as i32) && Instant::now() < deadline {
118            sleep(Duration::from_millis(100)).await;
119        }
120        if process_alive(pid as i32)
121            && let Err(err) = send_signal(pid as i32, Signal::SIGKILL)
122        {
123            errors.push(format!(
124                "failed to send {} to {} (pid {}): {}",
125                signal_name(Signal::SIGKILL),
126                running.record.id,
127                pid,
128                err
129            ));
130        }
131    }
132
133    for running in processes {
134        match running.handle {
135            ProcessHandle::Child(mut child) => {
136                if tokio::time::timeout(Duration::from_secs(1), child.wait())
137                    .await
138                    .is_err()
139                {
140                    let _ = child.start_kill();
141                    let _ = child.wait().await;
142                }
143            }
144            ProcessHandle::Task(mut handle) => {
145                tokio::select! {
146                    _ = &mut handle => {}
147                    _ = sleep(Duration::from_secs(1)) => {
148                        handle.abort();
149                        let _ = handle.await;
150                    }
151                }
152            }
153        }
154    }
155
156    errors
157}
158
159/// Stop all running processes tracked in state.
160pub async fn stop(state: &mut State) -> Result<()> {
161    let mut running: Vec<&mut ProcessRecord> = state
162        .processes
163        .iter_mut()
164        .filter(|record| matches!(record.last_status, ProcessStatus::Running))
165        .collect();
166
167    let mut errors = Vec::new();
168
169    for record in &mut running {
170        if let Some(shutdown) = &record.shutdown_handle {
171            shutdown.store(true, Ordering::SeqCst);
172        }
173
174        if let Some(pid) = current_pid(record) {
175            eprintln!(
176                "sending {} to {} (pid {})",
177                signal_name(Signal::SIGTERM),
178                record.id,
179                pid
180            );
181            if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
182                errors.push(format!(
183                    "failed to send {} to {} (pid {}): {}",
184                    signal_name(Signal::SIGTERM),
185                    record.id,
186                    pid,
187                    err
188                ));
189            }
190        }
191    }
192
193    let deadline = Instant::now() + Duration::from_secs(5);
194    for record in &mut running {
195        if let Some(pid) = current_pid(record) {
196            while process_alive(pid as i32) && Instant::now() < deadline {
197                sleep(Duration::from_millis(200)).await;
198            }
199            if process_alive(pid as i32) {
200                eprintln!(
201                    "sending {} to {} (pid {})",
202                    signal_name(Signal::SIGKILL),
203                    record.id,
204                    pid
205                );
206                if let Err(err) = send_signal(pid as i32, Signal::SIGKILL) {
207                    errors.push(format!(
208                        "failed to send {} to {} (pid {}): {}",
209                        signal_name(Signal::SIGKILL),
210                        record.id,
211                        pid,
212                        err
213                    ));
214                }
215                if process_alive(pid as i32) {
216                    errors.push(format!(
217                        "{} (pid {}) still running after SIGKILL",
218                        record.id, pid
219                    ));
220                    record.last_status = ProcessStatus::Unknown;
221                    continue;
222                }
223            }
224            record.last_status = ProcessStatus::Stopped;
225            record.stopped_at = Some(OffsetDateTime::now_utc());
226            record.exit_code = None;
227            record.exit_signal = None;
228            continue;
229        }
230        errors.push(format!("missing pid for {} while stopping", record.id));
231        record.last_status = ProcessStatus::Unknown;
232    }
233
234    state.touch().await?;
235    if errors.is_empty() {
236        Ok(())
237    } else {
238        Err(anyhow!(errors.join("\n")))
239    }
240}
241
242/// Restart sidecars for all nodes, updating persisted process state.
243pub async fn restart_sidecars(
244    layout: &AssetsLayout,
245    state: &mut State,
246    rust_log: &str,
247) -> Result<Vec<RunningProcess>> {
248    let total_nodes = layout.count_nodes().await?;
249    if total_nodes == 0 {
250        return Err(anyhow!(
251            "no nodes found under {}",
252            layout.nodes_dir().display()
253        ));
254    }
255
256    let mut restarted = Vec::new();
257    let mut errors = Vec::new();
258
259    for node_id in 1..=total_nodes {
260        if let Some(record) = state
261            .processes
262            .iter_mut()
263            .find(|record| record.node_id == node_id && matches!(record.kind, ProcessKind::Sidecar))
264        {
265            if let Some(pid) = current_pid(record) {
266                if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
267                    errors.push(format!(
268                        "failed to send {} to {} (pid {}): {}",
269                        signal_name(Signal::SIGTERM),
270                        record.id,
271                        pid,
272                        err
273                    ));
274                } else if !wait_for_process_exit(pid as i32, Duration::from_secs(5)).await
275                    && let Err(err) = send_signal(pid as i32, Signal::SIGKILL)
276                {
277                    errors.push(format!(
278                        "failed to send {} to {} (pid {}): {}",
279                        signal_name(Signal::SIGKILL),
280                        record.id,
281                        pid,
282                        err
283                    ));
284                }
285            }
286            record.last_status = ProcessStatus::Stopped;
287            record.stopped_at = Some(OffsetDateTime::now_utc());
288            record.exit_code = None;
289            record.exit_signal = None;
290            record.pid = None;
291            record.pid_handle = None;
292            record.shutdown_handle = None;
293        }
294
295        match spawn_sidecar(layout, node_id, total_nodes, rust_log).await {
296            Ok(Some(running)) => {
297                if let Some(slot) = state.processes.iter_mut().find(|record| {
298                    record.node_id == node_id && matches!(record.kind, ProcessKind::Sidecar)
299                }) {
300                    *slot = running.record.clone();
301                } else {
302                    state.processes.push(running.record.clone());
303                }
304                restarted.push(running);
305            }
306            Ok(None) => {}
307            Err(err) => {
308                errors.push(format!(
309                    "failed to restart sidecar for node-{}: {}",
310                    node_id, err
311                ));
312            }
313        }
314    }
315
316    state.touch().await?;
317    if errors.is_empty() {
318        Ok(restarted)
319    } else {
320        Err(anyhow!(errors.join("\n")))
321    }
322}
323
324/// Start newly added nodes and track their node/sidecar processes in state.
325pub async fn start_added_nodes(
326    layout: &AssetsLayout,
327    state: &mut State,
328    node_ids: &[u32],
329    total_nodes: u32,
330    rust_log: &str,
331) -> Result<Vec<RunningProcess>> {
332    let mut started = Vec::new();
333
334    for node_id in node_ids {
335        if state.processes.iter().any(|record| {
336            record.node_id == *node_id && matches!(record.last_status, ProcessStatus::Running)
337        }) {
338            let err = anyhow!("node-{} already has running process records", node_id);
339            return Err(cleanup_started_processes(err, started).await);
340        }
341
342        match start_node(layout, *node_id, total_nodes, rust_log).await {
343            Ok(mut records) => started.append(&mut records),
344            Err(err) => return Err(cleanup_started_processes(err, started).await),
345        }
346    }
347
348    let process_ids = started
349        .iter()
350        .map(|proc| proc.record.id.clone())
351        .collect::<Vec<_>>();
352    for running in &started {
353        if let Some(slot) = state.processes.iter_mut().find(|record| {
354            record.node_id == running.record.node_id
355                && std::mem::discriminant(&record.kind)
356                    == std::mem::discriminant(&running.record.kind)
357        }) {
358            *slot = running.record.clone();
359        } else {
360            state.processes.push(running.record.clone());
361        }
362    }
363
364    if let Err(err) = state.touch().await {
365        state
366            .processes
367            .retain(|record| !process_ids.iter().any(|id| id == &record.id));
368        return Err(cleanup_started_processes(err, started).await);
369    }
370
371    Ok(started)
372}
373
374/// Start a single node and optional sidecar.
375async fn start_node(
376    layout: &AssetsLayout,
377    node_id: u32,
378    total_nodes: u32,
379    rust_log: &str,
380) -> Result<Vec<RunningProcess>> {
381    let mut records = Vec::new();
382
383    let node_record = spawn_node(layout, node_id, total_nodes, rust_log).await?;
384    records.push(node_record);
385
386    match spawn_sidecar(layout, node_id, total_nodes, rust_log).await {
387        Ok(Some(sidecar_record)) => records.push(sidecar_record),
388        Ok(None) => {}
389        Err(err) => return Err(cleanup_started_processes(err, records).await),
390    }
391
392    Ok(records)
393}
394
395/// Spawn the embedded launcher to run a node process in this runtime.
396async fn spawn_node(
397    layout: &AssetsLayout,
398    node_id: u32,
399    total_nodes: u32,
400    rust_log: &str,
401) -> Result<RunningProcess> {
402    let node_dir = layout.node_dir(node_id);
403
404    let stdout_path = layout.node_logs_dir(node_id).join("stdout.log");
405    let stderr_path = layout.node_logs_dir(node_id).join("stderr.log");
406    create_log_symlinks(
407        layout,
408        node_id,
409        ProcessKind::Node,
410        &stdout_path,
411        &stderr_path,
412    )
413    .await?;
414
415    let mut launcher = Launcher::new_with_roots(
416        None,
417        layout.node_bin_dir(node_id),
418        layout.node_config_root(node_id),
419    )
420    .await?;
421    launcher.set_log_paths(stdout_path.clone(), stderr_path.clone());
422    launcher.set_cwd(node_dir.clone());
423    launcher.set_hook_context(layout.net_dir(), layout.hooks_dir());
424    launcher.set_rust_log(rust_log.to_string());
425
426    let mut env = BTreeMap::new();
427    env.insert(
428        "CASPER_CONFIG_DIR".to_string(),
429        layout
430            .node_config_root(node_id)
431            .to_string_lossy()
432            .to_string(),
433    );
434    launcher.set_envs(env);
435
436    let (command_path, command_args) = launcher.current_command();
437    let child_pid = launcher.child_pid();
438    let shutdown = Arc::new(AtomicBool::new(false));
439    let shutdown_thread = Arc::clone(&shutdown);
440    let handle =
441        tokio::spawn(async move { launcher.run_with_shutdown(shutdown_thread.as_ref()).await });
442    let pid = wait_for_pid(child_pid.as_ref()).await;
443
444    Ok(RunningProcess {
445        record: ProcessRecord {
446            id: format!("node-{}", node_id),
447            node_id,
448            kind: ProcessKind::Node,
449            group: process_group(node_id, total_nodes),
450            command: command_path.to_string_lossy().to_string(),
451            args: command_args,
452            cwd: node_dir.to_string_lossy().to_string(),
453            pid,
454            pid_handle: Some(child_pid),
455            shutdown_handle: Some(shutdown),
456            stdout_path: stdout_path.to_string_lossy().to_string(),
457            stderr_path: stderr_path.to_string_lossy().to_string(),
458            started_at: Some(OffsetDateTime::now_utc()),
459            stopped_at: None,
460            exit_code: None,
461            exit_signal: None,
462            last_status: ProcessStatus::Running,
463        },
464        handle: ProcessHandle::Task(handle),
465    })
466}
467
468/// Spawn the sidecar process if binary and config are available.
469async fn spawn_sidecar(
470    layout: &AssetsLayout,
471    node_id: u32,
472    total_nodes: u32,
473    rust_log: &str,
474) -> Result<Option<RunningProcess>> {
475    let version_dir = layout.latest_protocol_version_dir(node_id).await?;
476    let command_path = layout
477        .node_bin_dir(node_id)
478        .join(&version_dir)
479        .join("casper-sidecar");
480    let config_path = layout
481        .node_config_root(node_id)
482        .join(&version_dir)
483        .join("sidecar.toml");
484
485    if !is_file(&command_path).await || !is_file(&config_path).await {
486        return Ok(None);
487    }
488
489    let node_dir = layout.node_dir(node_id);
490    let logs_dir = layout.node_logs_dir(node_id);
491    let stdout_alias_path = logs_dir.join("sidecar-stdout.log");
492    let stderr_alias_path = logs_dir.join("sidecar-stderr.log");
493    let (stdout_target_path, stderr_target_path) =
494        prepare_versioned_log_aliases(&stdout_alias_path, &stderr_alias_path, &version_dir).await?;
495    create_log_symlinks(
496        layout,
497        node_id,
498        ProcessKind::Sidecar,
499        &stdout_alias_path,
500        &stderr_alias_path,
501    )
502    .await?;
503
504    let args = vec![
505        "--path-to-config".to_string(),
506        config_path.to_string_lossy().to_string(),
507    ];
508
509    let mut env = BTreeMap::new();
510    env.insert("RUST_LOG".to_string(), rust_log.to_string());
511
512    let child = spawn_process(
513        &command_path,
514        &args,
515        &env,
516        &node_dir,
517        &stdout_target_path,
518        &stderr_target_path,
519    )
520    .await?;
521    let pid = child.id();
522
523    Ok(Some(RunningProcess {
524        record: ProcessRecord {
525            id: format!("sidecar-{}", node_id),
526            node_id,
527            kind: ProcessKind::Sidecar,
528            group: process_group(node_id, total_nodes),
529            command: command_path.to_string_lossy().to_string(),
530            args,
531            cwd: node_dir.to_string_lossy().to_string(),
532            pid,
533            pid_handle: None,
534            shutdown_handle: None,
535            stdout_path: stdout_alias_path.to_string_lossy().to_string(),
536            stderr_path: stderr_alias_path.to_string_lossy().to_string(),
537            started_at: Some(OffsetDateTime::now_utc()),
538            stopped_at: None,
539            exit_code: None,
540            exit_signal: None,
541            last_status: ProcessStatus::Running,
542        },
543        handle: ProcessHandle::Child(child),
544    }))
545}
546
547/// Spawn a tokio-managed child process with redirected logs.
548async fn spawn_process(
549    command: &Path,
550    args: &[String],
551    env: &BTreeMap<String, String>,
552    cwd: &Path,
553    stdout_path: &Path,
554    stderr_path: &Path,
555) -> Result<tokio::process::Child> {
556    let stdout = OpenOptions::new()
557        .create(true)
558        .append(true)
559        .open(stdout_path)
560        .await?;
561    let stderr = OpenOptions::new()
562        .create(true)
563        .append(true)
564        .open(stderr_path)
565        .await?;
566    let stdout = stdout.into_std().await;
567    let stderr = stderr.into_std().await;
568
569    let mut cmd = Command::new(command);
570    cmd.args(args)
571        .envs(env)
572        .current_dir(cwd)
573        .stdout(stdout)
574        .stderr(stderr);
575
576    Ok(cmd.spawn()?)
577}
578
579async fn create_log_symlinks(
580    layout: &AssetsLayout,
581    node_id: u32,
582    kind: ProcessKind,
583    stdout_path: &Path,
584    stderr_path: &Path,
585) -> Result<()> {
586    let data_dir = layout.net_dir();
587    tokio_fs::create_dir_all(&data_dir).await?;
588    let prefix = match kind {
589        ProcessKind::Node => format!("node-{}", node_id),
590        ProcessKind::Sidecar => format!("sidecar-{}", node_id),
591    };
592    let stdout_link = data_dir.join(format!("{}.stdout", prefix));
593    let stderr_link = data_dir.join(format!("{}.stderr", prefix));
594    create_symlink(&stdout_link, stdout_path).await?;
595    create_symlink(&stderr_link, stderr_path).await?;
596    Ok(())
597}
598
599async fn create_symlink(link_path: &Path, target_path: &Path) -> Result<()> {
600    let parent = link_path
601        .parent()
602        .ok_or_else(|| anyhow!("link path {} has no parent", link_path.display()))?;
603    tokio_fs::create_dir_all(parent).await?;
604
605    if let Ok(metadata) = tokio_fs::symlink_metadata(link_path).await
606        && metadata.is_dir()
607    {
608        tokio_fs::remove_dir_all(link_path).await?;
609    }
610
611    let link_name = link_path
612        .file_name()
613        .ok_or_else(|| anyhow!("link path {} has no file name", link_path.display()))?
614        .to_string_lossy()
615        .to_string();
616    let tmp_link = parent.join(format!(".{link_name}.tmp-{}", std::process::id()));
617    let _ = tokio_fs::remove_file(&tmp_link).await;
618    tokio_fs::symlink(target_path, &tmp_link).await?;
619    tokio_fs::rename(&tmp_link, link_path).await?;
620    Ok(())
621}
622
623fn versioned_log_target(alias_path: &Path, version_fs: &str) -> Result<PathBuf> {
624    let parent = alias_path
625        .parent()
626        .ok_or_else(|| anyhow!("log alias {} has no parent", alias_path.display()))?;
627    let file_name = alias_path
628        .file_name()
629        .ok_or_else(|| anyhow!("log alias {} has no file name", alias_path.display()))?
630        .to_string_lossy()
631        .to_string();
632
633    if let Some((base, ext)) = file_name.rsplit_once('.') {
634        Ok(parent.join(format!("{base}-{version_fs}.{ext}")))
635    } else {
636        Ok(parent.join(format!("{file_name}-{version_fs}")))
637    }
638}
639
640async fn prepare_versioned_log_alias(alias_path: &Path, version_fs: &str) -> Result<PathBuf> {
641    let target_path = versioned_log_target(alias_path, version_fs)?;
642    let parent = alias_path
643        .parent()
644        .ok_or_else(|| anyhow!("log alias {} has no parent", alias_path.display()))?;
645    tokio_fs::create_dir_all(parent).await?;
646
647    if let Ok(metadata) = tokio_fs::symlink_metadata(alias_path).await {
648        if metadata.is_dir() {
649            tokio_fs::remove_dir_all(alias_path).await?;
650        } else if !metadata.file_type().is_symlink() {
651            if tokio_fs::symlink_metadata(&target_path).await.is_err() {
652                tokio_fs::rename(alias_path, &target_path).await?;
653            } else {
654                tokio_fs::remove_file(alias_path).await?;
655            }
656        }
657    }
658
659    create_symlink(alias_path, &target_path).await?;
660    Ok(target_path)
661}
662
663async fn prepare_versioned_log_aliases(
664    stdout_alias: &Path,
665    stderr_alias: &Path,
666    version_fs: &str,
667) -> Result<(PathBuf, PathBuf)> {
668    let stdout_target = prepare_versioned_log_alias(stdout_alias, version_fs).await?;
669    let stderr_target = prepare_versioned_log_alias(stderr_alias, version_fs).await?;
670    Ok((stdout_target, stderr_target))
671}
672
673fn process_group(node_id: u32, total_nodes: u32) -> ProcessGroup {
674    let genesis_nodes = total_nodes / 2;
675    if node_id <= BOOTSTRAP_NODES.min(genesis_nodes) {
676        ProcessGroup::Validators1
677    } else if node_id <= genesis_nodes {
678        ProcessGroup::Validators2
679    } else {
680        ProcessGroup::Validators3
681    }
682}
683
684fn process_alive(pid: i32) -> bool {
685    match kill(Pid::from_raw(pid), None) {
686        Ok(()) => true,
687        Err(Errno::ESRCH) => false,
688        Err(_) => true,
689    }
690}
691
692fn send_signal(target: i32, signal: Signal) -> Result<()> {
693    match kill(Pid::from_raw(target), signal) {
694        Ok(()) => Ok(()),
695        Err(Errno::ESRCH) => Ok(()),
696        Err(err) => Err(anyhow!(err)),
697    }
698}
699
700fn signal_name(signal: Signal) -> &'static str {
701    match signal {
702        Signal::SIGTERM => "SIGTERM",
703        Signal::SIGKILL => "SIGKILL",
704        _ => "signal",
705    }
706}
707
708async fn is_file(path: &Path) -> bool {
709    tokio_fs::metadata(path)
710        .await
711        .map(|meta| meta.is_file())
712        .unwrap_or(false)
713}
714
715async fn wait_for_pid(pid_handle: &AtomicU32) -> Option<u32> {
716    let deadline = Instant::now() + Duration::from_secs(1);
717    loop {
718        let pid = pid_handle.load(Ordering::SeqCst);
719        if pid != 0 {
720            return Some(pid);
721        }
722        if Instant::now() >= deadline {
723            return None;
724        }
725        sleep(Duration::from_millis(20)).await;
726    }
727}
728
729async fn wait_for_process_exit(pid: i32, timeout: Duration) -> bool {
730    let deadline = Instant::now() + timeout;
731    while Instant::now() < deadline {
732        if !process_alive(pid) {
733            return true;
734        }
735        sleep(Duration::from_millis(100)).await;
736    }
737    !process_alive(pid)
738}
739
740fn current_pid(record: &ProcessRecord) -> Option<u32> {
741    record.current_pid()
742}