1use crate::assets::{AssetsLayout, BOOTSTRAP_NODES};
2use crate::consensus_key_provider::ConsensusKeyProviderConfig;
3use crate::node_launcher::Launcher;
4use crate::state::{ProcessGroup, ProcessKind, ProcessRecord, ProcessStatus, State};
5use anyhow::{Result, anyhow};
6use nix::errno::Errno;
7use nix::sys::signal::{Signal, kill};
8use nix::unistd::Pid;
9use std::collections::BTreeMap;
10use std::path::{Path, PathBuf};
11use std::sync::{
12 Arc,
13 atomic::{AtomicBool, AtomicU32, Ordering},
14};
15use std::time::{Duration, Instant};
16use time::OffsetDateTime;
17use tokio::fs as tokio_fs;
18use tokio::fs::OpenOptions;
19use tokio::process::Command;
20use tokio::time::sleep;
21
22pub struct RunningProcess {
24 pub record: ProcessRecord,
25 pub handle: ProcessHandle,
26}
27
28pub enum ProcessHandle {
30 Child(tokio::process::Child),
31 Task(tokio::task::JoinHandle<Result<()>>),
32}
33
34pub struct StartPlan {
36 pub rust_log: String,
37 pub seed: Arc<str>,
38}
39
40pub async fn start(
42 layout: &AssetsLayout,
43 plan: &StartPlan,
44 state: &mut State,
45) -> Result<Vec<RunningProcess>> {
46 let total_nodes = layout.count_nodes().await?;
47 if total_nodes == 0 {
48 return Err(anyhow!(
49 "no nodes found under {}",
50 layout.nodes_dir().display()
51 ));
52 }
53 let node_ids: Vec<u32> = (1..=total_nodes).collect();
54
55 let mut started = Vec::new();
56 for node_id in node_ids {
57 if node_id > total_nodes {
58 let err = anyhow!("node {} exceeds total nodes {}", node_id, total_nodes);
59 return Err(cleanup_started_processes(err, started).await);
60 }
61 match start_node(
62 layout,
63 node_id,
64 total_nodes,
65 &plan.rust_log,
66 Arc::clone(&plan.seed),
67 )
68 .await
69 {
70 Ok(mut records) => started.append(&mut records),
71 Err(err) => return Err(cleanup_started_processes(err, started).await),
72 }
73 }
74
75 state.processes = started.iter().map(|proc| proc.record.clone()).collect();
76 if let Err(err) = state.touch().await {
77 state.processes.clear();
78 return Err(cleanup_started_processes(err, started).await);
79 }
80
81 Ok(started)
82}
83
84async fn cleanup_started_processes(
85 err: anyhow::Error,
86 started: Vec<RunningProcess>,
87) -> anyhow::Error {
88 let cleanup_errors = stop_started_processes(started).await;
89 if cleanup_errors.is_empty() {
90 err
91 } else {
92 anyhow!(
93 "{}; failed to clean up partially started processes: {}",
94 err,
95 cleanup_errors.join("; ")
96 )
97 }
98}
99
100async fn stop_started_processes(processes: Vec<RunningProcess>) -> Vec<String> {
101 let mut errors = Vec::new();
102
103 for running in &processes {
104 if let Some(shutdown) = &running.record.shutdown_handle {
105 shutdown.store(true, Ordering::SeqCst);
106 }
107
108 let Some(pid) = current_pid(&running.record) else {
109 continue;
110 };
111 if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
112 errors.push(format!(
113 "failed to send {} to {} (pid {}): {}",
114 signal_name(Signal::SIGTERM),
115 running.record.id,
116 pid,
117 err
118 ));
119 }
120 }
121
122 let deadline = Instant::now() + Duration::from_secs(5);
123 for running in &processes {
124 let Some(pid) = current_pid(&running.record) else {
125 continue;
126 };
127 while process_alive(pid as i32) && Instant::now() < deadline {
128 sleep(Duration::from_millis(100)).await;
129 }
130 if process_alive(pid as i32)
131 && let Err(err) = send_signal(pid as i32, Signal::SIGKILL)
132 {
133 errors.push(format!(
134 "failed to send {} to {} (pid {}): {}",
135 signal_name(Signal::SIGKILL),
136 running.record.id,
137 pid,
138 err
139 ));
140 }
141 }
142
143 for running in processes {
144 match running.handle {
145 ProcessHandle::Child(mut child) => {
146 if tokio::time::timeout(Duration::from_secs(1), child.wait())
147 .await
148 .is_err()
149 {
150 let _ = child.start_kill();
151 let _ = child.wait().await;
152 }
153 }
154 ProcessHandle::Task(mut handle) => {
155 tokio::select! {
156 _ = &mut handle => {}
157 _ = sleep(Duration::from_secs(1)) => {
158 handle.abort();
159 let _ = handle.await;
160 }
161 }
162 }
163 }
164 }
165
166 errors
167}
168
169pub async fn stop(state: &mut State) -> Result<()> {
171 let mut running: Vec<&mut ProcessRecord> = state
172 .processes
173 .iter_mut()
174 .filter(|record| matches!(record.last_status, ProcessStatus::Running))
175 .collect();
176
177 let mut errors = Vec::new();
178
179 for record in &mut running {
180 if let Some(shutdown) = &record.shutdown_handle {
181 shutdown.store(true, Ordering::SeqCst);
182 }
183
184 if let Some(pid) = current_pid(record) {
185 eprintln!(
186 "sending {} to {} (pid {})",
187 signal_name(Signal::SIGTERM),
188 record.id,
189 pid
190 );
191 if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
192 errors.push(format!(
193 "failed to send {} to {} (pid {}): {}",
194 signal_name(Signal::SIGTERM),
195 record.id,
196 pid,
197 err
198 ));
199 }
200 }
201 }
202
203 let deadline = Instant::now() + Duration::from_secs(5);
204 for record in &mut running {
205 if let Some(pid) = current_pid(record) {
206 while process_alive(pid as i32) && Instant::now() < deadline {
207 sleep(Duration::from_millis(200)).await;
208 }
209 if process_alive(pid as i32) {
210 eprintln!(
211 "sending {} to {} (pid {})",
212 signal_name(Signal::SIGKILL),
213 record.id,
214 pid
215 );
216 if let Err(err) = send_signal(pid as i32, Signal::SIGKILL) {
217 errors.push(format!(
218 "failed to send {} to {} (pid {}): {}",
219 signal_name(Signal::SIGKILL),
220 record.id,
221 pid,
222 err
223 ));
224 }
225 if !wait_for_process_exit(pid as i32, Duration::from_secs(5)).await {
226 errors.push(format!(
227 "{} (pid {}) still running after SIGKILL",
228 record.id, pid
229 ));
230 record.last_status = ProcessStatus::Unknown;
231 continue;
232 }
233 }
234 record.last_status = ProcessStatus::Stopped;
235 record.stopped_at = Some(OffsetDateTime::now_utc());
236 record.exit_code = None;
237 record.exit_signal = None;
238 continue;
239 }
240 errors.push(format!("missing pid for {} while stopping", record.id));
241 record.last_status = ProcessStatus::Unknown;
242 }
243
244 state.touch().await?;
245 if errors.is_empty() {
246 Ok(())
247 } else {
248 Err(anyhow!(errors.join("\n")))
249 }
250}
251
252pub async fn restart_sidecars(
254 layout: &AssetsLayout,
255 state: &mut State,
256 rust_log: &str,
257) -> Result<Vec<RunningProcess>> {
258 let total_nodes = layout.count_nodes().await?;
259 if total_nodes == 0 {
260 return Err(anyhow!(
261 "no nodes found under {}",
262 layout.nodes_dir().display()
263 ));
264 }
265
266 let mut restarted = Vec::new();
267 let mut errors = Vec::new();
268
269 for node_id in 1..=total_nodes {
270 if let Some(record) = state
271 .processes
272 .iter_mut()
273 .find(|record| record.node_id == node_id && matches!(record.kind, ProcessKind::Sidecar))
274 {
275 if let Some(pid) = current_pid(record) {
276 if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
277 errors.push(format!(
278 "failed to send {} to {} (pid {}): {}",
279 signal_name(Signal::SIGTERM),
280 record.id,
281 pid,
282 err
283 ));
284 } else if !wait_for_process_exit(pid as i32, Duration::from_secs(5)).await
285 && let Err(err) = send_signal(pid as i32, Signal::SIGKILL)
286 {
287 errors.push(format!(
288 "failed to send {} to {} (pid {}): {}",
289 signal_name(Signal::SIGKILL),
290 record.id,
291 pid,
292 err
293 ));
294 }
295 }
296 record.last_status = ProcessStatus::Stopped;
297 record.stopped_at = Some(OffsetDateTime::now_utc());
298 record.exit_code = None;
299 record.exit_signal = None;
300 record.pid = None;
301 record.pid_handle = None;
302 record.shutdown_handle = None;
303 }
304
305 match spawn_sidecar(layout, node_id, total_nodes, rust_log).await {
306 Ok(Some(running)) => {
307 if let Some(slot) = state.processes.iter_mut().find(|record| {
308 record.node_id == node_id && matches!(record.kind, ProcessKind::Sidecar)
309 }) {
310 *slot = running.record.clone();
311 } else {
312 state.processes.push(running.record.clone());
313 }
314 restarted.push(running);
315 }
316 Ok(None) => {}
317 Err(err) => {
318 errors.push(format!(
319 "failed to restart sidecar for node-{}: {}",
320 node_id, err
321 ));
322 }
323 }
324 }
325
326 state.touch().await?;
327 if errors.is_empty() {
328 Ok(restarted)
329 } else {
330 Err(anyhow!(errors.join("\n")))
331 }
332}
333
334pub async fn start_added_nodes(
336 layout: &AssetsLayout,
337 state: &mut State,
338 node_ids: &[u32],
339 total_nodes: u32,
340 rust_log: &str,
341 seed: Arc<str>,
342) -> Result<Vec<RunningProcess>> {
343 let mut started = Vec::new();
344
345 for node_id in node_ids {
346 if state.processes.iter().any(|record| {
347 record.node_id == *node_id && matches!(record.last_status, ProcessStatus::Running)
348 }) {
349 let err = anyhow!("node-{} already has running process records", node_id);
350 return Err(cleanup_started_processes(err, started).await);
351 }
352
353 match start_node(layout, *node_id, total_nodes, rust_log, Arc::clone(&seed)).await {
354 Ok(mut records) => started.append(&mut records),
355 Err(err) => return Err(cleanup_started_processes(err, started).await),
356 }
357 }
358
359 let process_ids = started
360 .iter()
361 .map(|proc| proc.record.id.clone())
362 .collect::<Vec<_>>();
363 for running in &started {
364 if let Some(slot) = state.processes.iter_mut().find(|record| {
365 record.node_id == running.record.node_id
366 && std::mem::discriminant(&record.kind)
367 == std::mem::discriminant(&running.record.kind)
368 }) {
369 *slot = running.record.clone();
370 } else {
371 state.processes.push(running.record.clone());
372 }
373 }
374
375 if let Err(err) = state.touch().await {
376 state
377 .processes
378 .retain(|record| !process_ids.iter().any(|id| id == &record.id));
379 return Err(cleanup_started_processes(err, started).await);
380 }
381
382 Ok(started)
383}
384
385async fn start_node(
387 layout: &AssetsLayout,
388 node_id: u32,
389 total_nodes: u32,
390 rust_log: &str,
391 seed: Arc<str>,
392) -> Result<Vec<RunningProcess>> {
393 let mut records = Vec::new();
394
395 let node_record = spawn_node(layout, node_id, total_nodes, rust_log, seed).await?;
396 records.push(node_record);
397
398 match spawn_sidecar(layout, node_id, total_nodes, rust_log).await {
399 Ok(Some(sidecar_record)) => records.push(sidecar_record),
400 Ok(None) => {}
401 Err(err) => return Err(cleanup_started_processes(err, records).await),
402 }
403
404 Ok(records)
405}
406
407async fn spawn_node(
409 layout: &AssetsLayout,
410 node_id: u32,
411 total_nodes: u32,
412 rust_log: &str,
413 seed: Arc<str>,
414) -> Result<RunningProcess> {
415 let node_dir = layout.node_dir(node_id);
416
417 let stdout_path = layout.node_logs_dir(node_id).join("stdout.log");
418 let stderr_path = layout.node_logs_dir(node_id).join("stderr.log");
419 create_log_symlinks(
420 layout,
421 node_id,
422 ProcessKind::Node,
423 &stdout_path,
424 &stderr_path,
425 )
426 .await?;
427
428 let mut launcher = Launcher::new_with_roots(
429 None,
430 layout.node_bin_dir(node_id),
431 layout.node_config_root(node_id),
432 )
433 .await?;
434 launcher.set_log_paths(stdout_path.clone(), stderr_path.clone());
435 launcher.set_cwd(node_dir.clone());
436 launcher.set_hook_context(layout.net_dir(), layout.hooks_dir());
437 launcher.set_rust_log(rust_log.to_string());
438 launcher.set_consensus_key_provider(ConsensusKeyProviderConfig::for_layout(
439 layout, node_id, seed,
440 ));
441
442 let mut env = BTreeMap::new();
443 env.insert(
444 "CASPER_CONFIG_DIR".to_string(),
445 layout
446 .node_config_root(node_id)
447 .to_string_lossy()
448 .to_string(),
449 );
450 launcher.set_envs(env);
451
452 let (command_path, command_args) = launcher.current_command();
453 let child_pid = launcher.child_pid();
454 let shutdown = Arc::new(AtomicBool::new(false));
455 let shutdown_thread = Arc::clone(&shutdown);
456 let handle =
457 tokio::spawn(async move { launcher.run_with_shutdown(shutdown_thread.as_ref()).await });
458 let pid = wait_for_pid(child_pid.as_ref()).await;
459
460 Ok(RunningProcess {
461 record: ProcessRecord {
462 id: format!("node-{}", node_id),
463 node_id,
464 kind: ProcessKind::Node,
465 group: process_group(node_id, total_nodes),
466 command: command_path.to_string_lossy().to_string(),
467 args: command_args,
468 cwd: node_dir.to_string_lossy().to_string(),
469 pid,
470 pid_handle: Some(child_pid),
471 shutdown_handle: Some(shutdown),
472 stdout_path: stdout_path.to_string_lossy().to_string(),
473 stderr_path: stderr_path.to_string_lossy().to_string(),
474 started_at: Some(OffsetDateTime::now_utc()),
475 stopped_at: None,
476 exit_code: None,
477 exit_signal: None,
478 last_status: ProcessStatus::Running,
479 },
480 handle: ProcessHandle::Task(handle),
481 })
482}
483
484async fn spawn_sidecar(
486 layout: &AssetsLayout,
487 node_id: u32,
488 total_nodes: u32,
489 rust_log: &str,
490) -> Result<Option<RunningProcess>> {
491 let version_dir = layout.latest_protocol_version_dir(node_id).await?;
492 let command_path = layout
493 .node_bin_dir(node_id)
494 .join(&version_dir)
495 .join("casper-sidecar");
496 let config_path = layout
497 .node_config_root(node_id)
498 .join(&version_dir)
499 .join("sidecar.toml");
500
501 if !is_file(&command_path).await || !is_file(&config_path).await {
502 return Ok(None);
503 }
504
505 let node_dir = layout.node_dir(node_id);
506 let logs_dir = layout.node_logs_dir(node_id);
507 let stdout_alias_path = logs_dir.join("sidecar-stdout.log");
508 let stderr_alias_path = logs_dir.join("sidecar-stderr.log");
509 let (stdout_target_path, stderr_target_path) =
510 prepare_versioned_log_aliases(&stdout_alias_path, &stderr_alias_path, &version_dir).await?;
511 create_log_symlinks(
512 layout,
513 node_id,
514 ProcessKind::Sidecar,
515 &stdout_alias_path,
516 &stderr_alias_path,
517 )
518 .await?;
519
520 let args = vec![
521 "--path-to-config".to_string(),
522 config_path.to_string_lossy().to_string(),
523 ];
524
525 let mut env = BTreeMap::new();
526 env.insert("RUST_LOG".to_string(), rust_log.to_string());
527
528 let child = spawn_process(
529 &command_path,
530 &args,
531 &env,
532 &node_dir,
533 &stdout_target_path,
534 &stderr_target_path,
535 )
536 .await?;
537 let pid = child.id();
538
539 Ok(Some(RunningProcess {
540 record: ProcessRecord {
541 id: format!("sidecar-{}", node_id),
542 node_id,
543 kind: ProcessKind::Sidecar,
544 group: process_group(node_id, total_nodes),
545 command: command_path.to_string_lossy().to_string(),
546 args,
547 cwd: node_dir.to_string_lossy().to_string(),
548 pid,
549 pid_handle: None,
550 shutdown_handle: None,
551 stdout_path: stdout_alias_path.to_string_lossy().to_string(),
552 stderr_path: stderr_alias_path.to_string_lossy().to_string(),
553 started_at: Some(OffsetDateTime::now_utc()),
554 stopped_at: None,
555 exit_code: None,
556 exit_signal: None,
557 last_status: ProcessStatus::Running,
558 },
559 handle: ProcessHandle::Child(child),
560 }))
561}
562
563async fn spawn_process(
565 command: &Path,
566 args: &[String],
567 env: &BTreeMap<String, String>,
568 cwd: &Path,
569 stdout_path: &Path,
570 stderr_path: &Path,
571) -> Result<tokio::process::Child> {
572 let stdout = OpenOptions::new()
573 .create(true)
574 .append(true)
575 .open(stdout_path)
576 .await?;
577 let stderr = OpenOptions::new()
578 .create(true)
579 .append(true)
580 .open(stderr_path)
581 .await?;
582 let stdout = stdout.into_std().await;
583 let stderr = stderr.into_std().await;
584
585 let mut cmd = Command::new(command);
586 cmd.args(args)
587 .envs(env)
588 .current_dir(cwd)
589 .stdout(stdout)
590 .stderr(stderr);
591
592 Ok(cmd.spawn()?)
593}
594
595async fn create_log_symlinks(
596 layout: &AssetsLayout,
597 node_id: u32,
598 kind: ProcessKind,
599 stdout_path: &Path,
600 stderr_path: &Path,
601) -> Result<()> {
602 let data_dir = layout.net_dir();
603 tokio_fs::create_dir_all(&data_dir).await?;
604 let prefix = match kind {
605 ProcessKind::Node => format!("node-{}", node_id),
606 ProcessKind::Sidecar => format!("sidecar-{}", node_id),
607 };
608 let stdout_link = data_dir.join(format!("{}.stdout", prefix));
609 let stderr_link = data_dir.join(format!("{}.stderr", prefix));
610 create_symlink(&stdout_link, stdout_path).await?;
611 create_symlink(&stderr_link, stderr_path).await?;
612 Ok(())
613}
614
615async fn create_symlink(link_path: &Path, target_path: &Path) -> Result<()> {
616 let parent = link_path
617 .parent()
618 .ok_or_else(|| anyhow!("link path {} has no parent", link_path.display()))?;
619 tokio_fs::create_dir_all(parent).await?;
620
621 if let Ok(metadata) = tokio_fs::symlink_metadata(link_path).await
622 && metadata.is_dir()
623 {
624 tokio_fs::remove_dir_all(link_path).await?;
625 }
626
627 let link_name = link_path
628 .file_name()
629 .ok_or_else(|| anyhow!("link path {} has no file name", link_path.display()))?
630 .to_string_lossy()
631 .to_string();
632 let tmp_link = parent.join(format!(".{link_name}.tmp-{}", std::process::id()));
633 let _ = tokio_fs::remove_file(&tmp_link).await;
634 tokio_fs::symlink(target_path, &tmp_link).await?;
635 tokio_fs::rename(&tmp_link, link_path).await?;
636 Ok(())
637}
638
639fn versioned_log_target(alias_path: &Path, version_fs: &str) -> Result<PathBuf> {
640 let parent = alias_path
641 .parent()
642 .ok_or_else(|| anyhow!("log alias {} has no parent", alias_path.display()))?;
643 let file_name = alias_path
644 .file_name()
645 .ok_or_else(|| anyhow!("log alias {} has no file name", alias_path.display()))?
646 .to_string_lossy()
647 .to_string();
648
649 if let Some((base, ext)) = file_name.rsplit_once('.') {
650 Ok(parent.join(format!("{base}-{version_fs}.{ext}")))
651 } else {
652 Ok(parent.join(format!("{file_name}-{version_fs}")))
653 }
654}
655
656async fn prepare_versioned_log_alias(alias_path: &Path, version_fs: &str) -> Result<PathBuf> {
657 let target_path = versioned_log_target(alias_path, version_fs)?;
658 let parent = alias_path
659 .parent()
660 .ok_or_else(|| anyhow!("log alias {} has no parent", alias_path.display()))?;
661 tokio_fs::create_dir_all(parent).await?;
662
663 if let Ok(metadata) = tokio_fs::symlink_metadata(alias_path).await {
664 if metadata.is_dir() {
665 tokio_fs::remove_dir_all(alias_path).await?;
666 } else if !metadata.file_type().is_symlink() {
667 if tokio_fs::symlink_metadata(&target_path).await.is_err() {
668 tokio_fs::rename(alias_path, &target_path).await?;
669 } else {
670 tokio_fs::remove_file(alias_path).await?;
671 }
672 }
673 }
674
675 create_symlink(alias_path, &target_path).await?;
676 Ok(target_path)
677}
678
679async fn prepare_versioned_log_aliases(
680 stdout_alias: &Path,
681 stderr_alias: &Path,
682 version_fs: &str,
683) -> Result<(PathBuf, PathBuf)> {
684 let stdout_target = prepare_versioned_log_alias(stdout_alias, version_fs).await?;
685 let stderr_target = prepare_versioned_log_alias(stderr_alias, version_fs).await?;
686 Ok((stdout_target, stderr_target))
687}
688
689fn process_group(node_id: u32, total_nodes: u32) -> ProcessGroup {
690 let genesis_nodes = total_nodes / 2;
691 if node_id <= BOOTSTRAP_NODES.min(genesis_nodes) {
692 ProcessGroup::Validators1
693 } else if node_id <= genesis_nodes {
694 ProcessGroup::Validators2
695 } else {
696 ProcessGroup::Validators3
697 }
698}
699
700fn process_alive(pid: i32) -> bool {
701 match kill(Pid::from_raw(pid), None) {
702 Ok(()) => true,
703 Err(Errno::ESRCH) => false,
704 Err(_) => true,
705 }
706}
707
708fn send_signal(target: i32, signal: Signal) -> Result<()> {
709 match kill(Pid::from_raw(target), signal) {
710 Ok(()) => Ok(()),
711 Err(Errno::ESRCH) => Ok(()),
712 Err(err) => Err(anyhow!(err)),
713 }
714}
715
716fn signal_name(signal: Signal) -> &'static str {
717 match signal {
718 Signal::SIGTERM => "SIGTERM",
719 Signal::SIGKILL => "SIGKILL",
720 _ => "signal",
721 }
722}
723
724async fn is_file(path: &Path) -> bool {
725 tokio_fs::metadata(path)
726 .await
727 .map(|meta| meta.is_file())
728 .unwrap_or(false)
729}
730
731async fn wait_for_pid(pid_handle: &AtomicU32) -> Option<u32> {
732 let deadline = Instant::now() + Duration::from_secs(1);
733 loop {
734 let pid = pid_handle.load(Ordering::SeqCst);
735 if pid != 0 {
736 return Some(pid);
737 }
738 if Instant::now() >= deadline {
739 return None;
740 }
741 sleep(Duration::from_millis(20)).await;
742 }
743}
744
745async fn wait_for_process_exit(pid: i32, timeout: Duration) -> bool {
746 let deadline = Instant::now() + timeout;
747 while Instant::now() < deadline {
748 if !process_alive(pid) {
749 return true;
750 }
751 sleep(Duration::from_millis(100)).await;
752 }
753 !process_alive(pid)
754}
755
756fn current_pid(record: &ProcessRecord) -> Option<u32> {
757 record.current_pid()
758}