1use crate::assets::{AssetsLayout, BOOTSTRAP_NODES};
2use crate::node_launcher::Launcher;
3use crate::state::{ProcessGroup, ProcessKind, ProcessRecord, ProcessStatus, State};
4use anyhow::{Result, anyhow};
5use nix::errno::Errno;
6use nix::sys::signal::{Signal, kill};
7use nix::unistd::Pid;
8use std::collections::BTreeMap;
9use std::path::{Path, PathBuf};
10use std::sync::{
11 Arc,
12 atomic::{AtomicBool, AtomicU32, Ordering},
13};
14use std::time::{Duration, Instant};
15use time::OffsetDateTime;
16use tokio::fs as tokio_fs;
17use tokio::fs::OpenOptions;
18use tokio::process::Command;
19use tokio::time::sleep;
20
21pub struct RunningProcess {
23 pub record: ProcessRecord,
24 pub handle: ProcessHandle,
25}
26
27pub enum ProcessHandle {
29 Child(tokio::process::Child),
30 Task(tokio::task::JoinHandle<Result<()>>),
31}
32
33pub struct StartPlan {
35 pub rust_log: String,
36}
37
38pub async fn start(
40 layout: &AssetsLayout,
41 plan: &StartPlan,
42 state: &mut State,
43) -> Result<Vec<RunningProcess>> {
44 let total_nodes = layout.count_nodes().await?;
45 if total_nodes == 0 {
46 return Err(anyhow!(
47 "no nodes found under {}",
48 layout.nodes_dir().display()
49 ));
50 }
51 let node_ids: Vec<u32> = (1..=total_nodes).collect();
52
53 let mut started = Vec::new();
54 for node_id in node_ids {
55 if node_id > total_nodes {
56 let err = anyhow!("node {} exceeds total nodes {}", node_id, total_nodes);
57 return Err(cleanup_started_processes(err, started).await);
58 }
59 match start_node(layout, node_id, total_nodes, &plan.rust_log).await {
60 Ok(mut records) => started.append(&mut records),
61 Err(err) => return Err(cleanup_started_processes(err, started).await),
62 }
63 }
64
65 state.processes = started.iter().map(|proc| proc.record.clone()).collect();
66 if let Err(err) = state.touch().await {
67 state.processes.clear();
68 return Err(cleanup_started_processes(err, started).await);
69 }
70
71 Ok(started)
72}
73
74async fn cleanup_started_processes(
75 err: anyhow::Error,
76 started: Vec<RunningProcess>,
77) -> anyhow::Error {
78 let cleanup_errors = stop_started_processes(started).await;
79 if cleanup_errors.is_empty() {
80 err
81 } else {
82 anyhow!(
83 "{}; failed to clean up partially started processes: {}",
84 err,
85 cleanup_errors.join("; ")
86 )
87 }
88}
89
90async fn stop_started_processes(processes: Vec<RunningProcess>) -> Vec<String> {
91 let mut errors = Vec::new();
92
93 for running in &processes {
94 if let Some(shutdown) = &running.record.shutdown_handle {
95 shutdown.store(true, Ordering::SeqCst);
96 }
97
98 let Some(pid) = current_pid(&running.record) else {
99 continue;
100 };
101 if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
102 errors.push(format!(
103 "failed to send {} to {} (pid {}): {}",
104 signal_name(Signal::SIGTERM),
105 running.record.id,
106 pid,
107 err
108 ));
109 }
110 }
111
112 let deadline = Instant::now() + Duration::from_secs(5);
113 for running in &processes {
114 let Some(pid) = current_pid(&running.record) else {
115 continue;
116 };
117 while process_alive(pid as i32) && Instant::now() < deadline {
118 sleep(Duration::from_millis(100)).await;
119 }
120 if process_alive(pid as i32)
121 && let Err(err) = send_signal(pid as i32, Signal::SIGKILL)
122 {
123 errors.push(format!(
124 "failed to send {} to {} (pid {}): {}",
125 signal_name(Signal::SIGKILL),
126 running.record.id,
127 pid,
128 err
129 ));
130 }
131 }
132
133 for running in processes {
134 match running.handle {
135 ProcessHandle::Child(mut child) => {
136 if tokio::time::timeout(Duration::from_secs(1), child.wait())
137 .await
138 .is_err()
139 {
140 let _ = child.start_kill();
141 let _ = child.wait().await;
142 }
143 }
144 ProcessHandle::Task(mut handle) => {
145 tokio::select! {
146 _ = &mut handle => {}
147 _ = sleep(Duration::from_secs(1)) => {
148 handle.abort();
149 let _ = handle.await;
150 }
151 }
152 }
153 }
154 }
155
156 errors
157}
158
159pub async fn stop(state: &mut State) -> Result<()> {
161 let mut running: Vec<&mut ProcessRecord> = state
162 .processes
163 .iter_mut()
164 .filter(|record| matches!(record.last_status, ProcessStatus::Running))
165 .collect();
166
167 let mut errors = Vec::new();
168
169 for record in &mut running {
170 if let Some(shutdown) = &record.shutdown_handle {
171 shutdown.store(true, Ordering::SeqCst);
172 }
173
174 if let Some(pid) = current_pid(record) {
175 eprintln!(
176 "sending {} to {} (pid {})",
177 signal_name(Signal::SIGTERM),
178 record.id,
179 pid
180 );
181 if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
182 errors.push(format!(
183 "failed to send {} to {} (pid {}): {}",
184 signal_name(Signal::SIGTERM),
185 record.id,
186 pid,
187 err
188 ));
189 }
190 }
191 }
192
193 let deadline = Instant::now() + Duration::from_secs(5);
194 for record in &mut running {
195 if let Some(pid) = current_pid(record) {
196 while process_alive(pid as i32) && Instant::now() < deadline {
197 sleep(Duration::from_millis(200)).await;
198 }
199 if process_alive(pid as i32) {
200 eprintln!(
201 "sending {} to {} (pid {})",
202 signal_name(Signal::SIGKILL),
203 record.id,
204 pid
205 );
206 if let Err(err) = send_signal(pid as i32, Signal::SIGKILL) {
207 errors.push(format!(
208 "failed to send {} to {} (pid {}): {}",
209 signal_name(Signal::SIGKILL),
210 record.id,
211 pid,
212 err
213 ));
214 }
215 if process_alive(pid as i32) {
216 errors.push(format!(
217 "{} (pid {}) still running after SIGKILL",
218 record.id, pid
219 ));
220 record.last_status = ProcessStatus::Unknown;
221 continue;
222 }
223 }
224 record.last_status = ProcessStatus::Stopped;
225 record.stopped_at = Some(OffsetDateTime::now_utc());
226 record.exit_code = None;
227 record.exit_signal = None;
228 continue;
229 }
230 errors.push(format!("missing pid for {} while stopping", record.id));
231 record.last_status = ProcessStatus::Unknown;
232 }
233
234 state.touch().await?;
235 if errors.is_empty() {
236 Ok(())
237 } else {
238 Err(anyhow!(errors.join("\n")))
239 }
240}
241
242pub async fn restart_sidecars(
244 layout: &AssetsLayout,
245 state: &mut State,
246 rust_log: &str,
247) -> Result<Vec<RunningProcess>> {
248 let total_nodes = layout.count_nodes().await?;
249 if total_nodes == 0 {
250 return Err(anyhow!(
251 "no nodes found under {}",
252 layout.nodes_dir().display()
253 ));
254 }
255
256 let mut restarted = Vec::new();
257 let mut errors = Vec::new();
258
259 for node_id in 1..=total_nodes {
260 if let Some(record) = state
261 .processes
262 .iter_mut()
263 .find(|record| record.node_id == node_id && matches!(record.kind, ProcessKind::Sidecar))
264 {
265 if let Some(pid) = current_pid(record) {
266 if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
267 errors.push(format!(
268 "failed to send {} to {} (pid {}): {}",
269 signal_name(Signal::SIGTERM),
270 record.id,
271 pid,
272 err
273 ));
274 } else if !wait_for_process_exit(pid as i32, Duration::from_secs(5)).await
275 && let Err(err) = send_signal(pid as i32, Signal::SIGKILL)
276 {
277 errors.push(format!(
278 "failed to send {} to {} (pid {}): {}",
279 signal_name(Signal::SIGKILL),
280 record.id,
281 pid,
282 err
283 ));
284 }
285 }
286 record.last_status = ProcessStatus::Stopped;
287 record.stopped_at = Some(OffsetDateTime::now_utc());
288 record.exit_code = None;
289 record.exit_signal = None;
290 record.pid = None;
291 record.pid_handle = None;
292 record.shutdown_handle = None;
293 }
294
295 match spawn_sidecar(layout, node_id, total_nodes, rust_log).await {
296 Ok(Some(running)) => {
297 if let Some(slot) = state.processes.iter_mut().find(|record| {
298 record.node_id == node_id && matches!(record.kind, ProcessKind::Sidecar)
299 }) {
300 *slot = running.record.clone();
301 } else {
302 state.processes.push(running.record.clone());
303 }
304 restarted.push(running);
305 }
306 Ok(None) => {}
307 Err(err) => {
308 errors.push(format!(
309 "failed to restart sidecar for node-{}: {}",
310 node_id, err
311 ));
312 }
313 }
314 }
315
316 state.touch().await?;
317 if errors.is_empty() {
318 Ok(restarted)
319 } else {
320 Err(anyhow!(errors.join("\n")))
321 }
322}
323
324pub async fn start_added_nodes(
326 layout: &AssetsLayout,
327 state: &mut State,
328 node_ids: &[u32],
329 total_nodes: u32,
330 rust_log: &str,
331) -> Result<Vec<RunningProcess>> {
332 let mut started = Vec::new();
333
334 for node_id in node_ids {
335 if state.processes.iter().any(|record| {
336 record.node_id == *node_id && matches!(record.last_status, ProcessStatus::Running)
337 }) {
338 let err = anyhow!("node-{} already has running process records", node_id);
339 return Err(cleanup_started_processes(err, started).await);
340 }
341
342 match start_node(layout, *node_id, total_nodes, rust_log).await {
343 Ok(mut records) => started.append(&mut records),
344 Err(err) => return Err(cleanup_started_processes(err, started).await),
345 }
346 }
347
348 let process_ids = started
349 .iter()
350 .map(|proc| proc.record.id.clone())
351 .collect::<Vec<_>>();
352 for running in &started {
353 if let Some(slot) = state.processes.iter_mut().find(|record| {
354 record.node_id == running.record.node_id
355 && std::mem::discriminant(&record.kind)
356 == std::mem::discriminant(&running.record.kind)
357 }) {
358 *slot = running.record.clone();
359 } else {
360 state.processes.push(running.record.clone());
361 }
362 }
363
364 if let Err(err) = state.touch().await {
365 state
366 .processes
367 .retain(|record| !process_ids.iter().any(|id| id == &record.id));
368 return Err(cleanup_started_processes(err, started).await);
369 }
370
371 Ok(started)
372}
373
374async fn start_node(
376 layout: &AssetsLayout,
377 node_id: u32,
378 total_nodes: u32,
379 rust_log: &str,
380) -> Result<Vec<RunningProcess>> {
381 let mut records = Vec::new();
382
383 let node_record = spawn_node(layout, node_id, total_nodes, rust_log).await?;
384 records.push(node_record);
385
386 match spawn_sidecar(layout, node_id, total_nodes, rust_log).await {
387 Ok(Some(sidecar_record)) => records.push(sidecar_record),
388 Ok(None) => {}
389 Err(err) => return Err(cleanup_started_processes(err, records).await),
390 }
391
392 Ok(records)
393}
394
395async fn spawn_node(
397 layout: &AssetsLayout,
398 node_id: u32,
399 total_nodes: u32,
400 rust_log: &str,
401) -> Result<RunningProcess> {
402 let node_dir = layout.node_dir(node_id);
403
404 let stdout_path = layout.node_logs_dir(node_id).join("stdout.log");
405 let stderr_path = layout.node_logs_dir(node_id).join("stderr.log");
406 create_log_symlinks(
407 layout,
408 node_id,
409 ProcessKind::Node,
410 &stdout_path,
411 &stderr_path,
412 )
413 .await?;
414
415 let mut launcher = Launcher::new_with_roots(
416 None,
417 layout.node_bin_dir(node_id),
418 layout.node_config_root(node_id),
419 )
420 .await?;
421 launcher.set_log_paths(stdout_path.clone(), stderr_path.clone());
422 launcher.set_cwd(node_dir.clone());
423 launcher.set_hook_context(layout.net_dir(), layout.hooks_dir());
424 launcher.set_rust_log(rust_log.to_string());
425
426 let mut env = BTreeMap::new();
427 env.insert(
428 "CASPER_CONFIG_DIR".to_string(),
429 layout
430 .node_config_root(node_id)
431 .to_string_lossy()
432 .to_string(),
433 );
434 launcher.set_envs(env);
435
436 let (command_path, command_args) = launcher.current_command();
437 let child_pid = launcher.child_pid();
438 let shutdown = Arc::new(AtomicBool::new(false));
439 let shutdown_thread = Arc::clone(&shutdown);
440 let handle =
441 tokio::spawn(async move { launcher.run_with_shutdown(shutdown_thread.as_ref()).await });
442 let pid = wait_for_pid(child_pid.as_ref()).await;
443
444 Ok(RunningProcess {
445 record: ProcessRecord {
446 id: format!("node-{}", node_id),
447 node_id,
448 kind: ProcessKind::Node,
449 group: process_group(node_id, total_nodes),
450 command: command_path.to_string_lossy().to_string(),
451 args: command_args,
452 cwd: node_dir.to_string_lossy().to_string(),
453 pid,
454 pid_handle: Some(child_pid),
455 shutdown_handle: Some(shutdown),
456 stdout_path: stdout_path.to_string_lossy().to_string(),
457 stderr_path: stderr_path.to_string_lossy().to_string(),
458 started_at: Some(OffsetDateTime::now_utc()),
459 stopped_at: None,
460 exit_code: None,
461 exit_signal: None,
462 last_status: ProcessStatus::Running,
463 },
464 handle: ProcessHandle::Task(handle),
465 })
466}
467
468async fn spawn_sidecar(
470 layout: &AssetsLayout,
471 node_id: u32,
472 total_nodes: u32,
473 rust_log: &str,
474) -> Result<Option<RunningProcess>> {
475 let version_dir = layout.latest_protocol_version_dir(node_id).await?;
476 let command_path = layout
477 .node_bin_dir(node_id)
478 .join(&version_dir)
479 .join("casper-sidecar");
480 let config_path = layout
481 .node_config_root(node_id)
482 .join(&version_dir)
483 .join("sidecar.toml");
484
485 if !is_file(&command_path).await || !is_file(&config_path).await {
486 return Ok(None);
487 }
488
489 let node_dir = layout.node_dir(node_id);
490 let logs_dir = layout.node_logs_dir(node_id);
491 let stdout_alias_path = logs_dir.join("sidecar-stdout.log");
492 let stderr_alias_path = logs_dir.join("sidecar-stderr.log");
493 let (stdout_target_path, stderr_target_path) =
494 prepare_versioned_log_aliases(&stdout_alias_path, &stderr_alias_path, &version_dir).await?;
495 create_log_symlinks(
496 layout,
497 node_id,
498 ProcessKind::Sidecar,
499 &stdout_alias_path,
500 &stderr_alias_path,
501 )
502 .await?;
503
504 let args = vec![
505 "--path-to-config".to_string(),
506 config_path.to_string_lossy().to_string(),
507 ];
508
509 let mut env = BTreeMap::new();
510 env.insert("RUST_LOG".to_string(), rust_log.to_string());
511
512 let child = spawn_process(
513 &command_path,
514 &args,
515 &env,
516 &node_dir,
517 &stdout_target_path,
518 &stderr_target_path,
519 )
520 .await?;
521 let pid = child.id();
522
523 Ok(Some(RunningProcess {
524 record: ProcessRecord {
525 id: format!("sidecar-{}", node_id),
526 node_id,
527 kind: ProcessKind::Sidecar,
528 group: process_group(node_id, total_nodes),
529 command: command_path.to_string_lossy().to_string(),
530 args,
531 cwd: node_dir.to_string_lossy().to_string(),
532 pid,
533 pid_handle: None,
534 shutdown_handle: None,
535 stdout_path: stdout_alias_path.to_string_lossy().to_string(),
536 stderr_path: stderr_alias_path.to_string_lossy().to_string(),
537 started_at: Some(OffsetDateTime::now_utc()),
538 stopped_at: None,
539 exit_code: None,
540 exit_signal: None,
541 last_status: ProcessStatus::Running,
542 },
543 handle: ProcessHandle::Child(child),
544 }))
545}
546
547async fn spawn_process(
549 command: &Path,
550 args: &[String],
551 env: &BTreeMap<String, String>,
552 cwd: &Path,
553 stdout_path: &Path,
554 stderr_path: &Path,
555) -> Result<tokio::process::Child> {
556 let stdout = OpenOptions::new()
557 .create(true)
558 .append(true)
559 .open(stdout_path)
560 .await?;
561 let stderr = OpenOptions::new()
562 .create(true)
563 .append(true)
564 .open(stderr_path)
565 .await?;
566 let stdout = stdout.into_std().await;
567 let stderr = stderr.into_std().await;
568
569 let mut cmd = Command::new(command);
570 cmd.args(args)
571 .envs(env)
572 .current_dir(cwd)
573 .stdout(stdout)
574 .stderr(stderr);
575
576 Ok(cmd.spawn()?)
577}
578
579async fn create_log_symlinks(
580 layout: &AssetsLayout,
581 node_id: u32,
582 kind: ProcessKind,
583 stdout_path: &Path,
584 stderr_path: &Path,
585) -> Result<()> {
586 let data_dir = layout.net_dir();
587 tokio_fs::create_dir_all(&data_dir).await?;
588 let prefix = match kind {
589 ProcessKind::Node => format!("node-{}", node_id),
590 ProcessKind::Sidecar => format!("sidecar-{}", node_id),
591 };
592 let stdout_link = data_dir.join(format!("{}.stdout", prefix));
593 let stderr_link = data_dir.join(format!("{}.stderr", prefix));
594 create_symlink(&stdout_link, stdout_path).await?;
595 create_symlink(&stderr_link, stderr_path).await?;
596 Ok(())
597}
598
599async fn create_symlink(link_path: &Path, target_path: &Path) -> Result<()> {
600 let parent = link_path
601 .parent()
602 .ok_or_else(|| anyhow!("link path {} has no parent", link_path.display()))?;
603 tokio_fs::create_dir_all(parent).await?;
604
605 if let Ok(metadata) = tokio_fs::symlink_metadata(link_path).await
606 && metadata.is_dir()
607 {
608 tokio_fs::remove_dir_all(link_path).await?;
609 }
610
611 let link_name = link_path
612 .file_name()
613 .ok_or_else(|| anyhow!("link path {} has no file name", link_path.display()))?
614 .to_string_lossy()
615 .to_string();
616 let tmp_link = parent.join(format!(".{link_name}.tmp-{}", std::process::id()));
617 let _ = tokio_fs::remove_file(&tmp_link).await;
618 tokio_fs::symlink(target_path, &tmp_link).await?;
619 tokio_fs::rename(&tmp_link, link_path).await?;
620 Ok(())
621}
622
623fn versioned_log_target(alias_path: &Path, version_fs: &str) -> Result<PathBuf> {
624 let parent = alias_path
625 .parent()
626 .ok_or_else(|| anyhow!("log alias {} has no parent", alias_path.display()))?;
627 let file_name = alias_path
628 .file_name()
629 .ok_or_else(|| anyhow!("log alias {} has no file name", alias_path.display()))?
630 .to_string_lossy()
631 .to_string();
632
633 if let Some((base, ext)) = file_name.rsplit_once('.') {
634 Ok(parent.join(format!("{base}-{version_fs}.{ext}")))
635 } else {
636 Ok(parent.join(format!("{file_name}-{version_fs}")))
637 }
638}
639
640async fn prepare_versioned_log_alias(alias_path: &Path, version_fs: &str) -> Result<PathBuf> {
641 let target_path = versioned_log_target(alias_path, version_fs)?;
642 let parent = alias_path
643 .parent()
644 .ok_or_else(|| anyhow!("log alias {} has no parent", alias_path.display()))?;
645 tokio_fs::create_dir_all(parent).await?;
646
647 if let Ok(metadata) = tokio_fs::symlink_metadata(alias_path).await {
648 if metadata.is_dir() {
649 tokio_fs::remove_dir_all(alias_path).await?;
650 } else if !metadata.file_type().is_symlink() {
651 if tokio_fs::symlink_metadata(&target_path).await.is_err() {
652 tokio_fs::rename(alias_path, &target_path).await?;
653 } else {
654 tokio_fs::remove_file(alias_path).await?;
655 }
656 }
657 }
658
659 create_symlink(alias_path, &target_path).await?;
660 Ok(target_path)
661}
662
663async fn prepare_versioned_log_aliases(
664 stdout_alias: &Path,
665 stderr_alias: &Path,
666 version_fs: &str,
667) -> Result<(PathBuf, PathBuf)> {
668 let stdout_target = prepare_versioned_log_alias(stdout_alias, version_fs).await?;
669 let stderr_target = prepare_versioned_log_alias(stderr_alias, version_fs).await?;
670 Ok((stdout_target, stderr_target))
671}
672
673fn process_group(node_id: u32, total_nodes: u32) -> ProcessGroup {
674 let genesis_nodes = total_nodes / 2;
675 if node_id <= BOOTSTRAP_NODES.min(genesis_nodes) {
676 ProcessGroup::Validators1
677 } else if node_id <= genesis_nodes {
678 ProcessGroup::Validators2
679 } else {
680 ProcessGroup::Validators3
681 }
682}
683
684fn process_alive(pid: i32) -> bool {
685 match kill(Pid::from_raw(pid), None) {
686 Ok(()) => true,
687 Err(Errno::ESRCH) => false,
688 Err(_) => true,
689 }
690}
691
692fn send_signal(target: i32, signal: Signal) -> Result<()> {
693 match kill(Pid::from_raw(target), signal) {
694 Ok(()) => Ok(()),
695 Err(Errno::ESRCH) => Ok(()),
696 Err(err) => Err(anyhow!(err)),
697 }
698}
699
700fn signal_name(signal: Signal) -> &'static str {
701 match signal {
702 Signal::SIGTERM => "SIGTERM",
703 Signal::SIGKILL => "SIGKILL",
704 _ => "signal",
705 }
706}
707
708async fn is_file(path: &Path) -> bool {
709 tokio_fs::metadata(path)
710 .await
711 .map(|meta| meta.is_file())
712 .unwrap_or(false)
713}
714
715async fn wait_for_pid(pid_handle: &AtomicU32) -> Option<u32> {
716 let deadline = Instant::now() + Duration::from_secs(1);
717 loop {
718 let pid = pid_handle.load(Ordering::SeqCst);
719 if pid != 0 {
720 return Some(pid);
721 }
722 if Instant::now() >= deadline {
723 return None;
724 }
725 sleep(Duration::from_millis(20)).await;
726 }
727}
728
729async fn wait_for_process_exit(pid: i32, timeout: Duration) -> bool {
730 let deadline = Instant::now() + timeout;
731 while Instant::now() < deadline {
732 if !process_alive(pid) {
733 return true;
734 }
735 sleep(Duration::from_millis(100)).await;
736 }
737 !process_alive(pid)
738}
739
740fn current_pid(record: &ProcessRecord) -> Option<u32> {
741 record.current_pid()
742}