1use crate::assets::{AssetsLayout, BOOTSTRAP_NODES};
2use crate::node_launcher::Launcher;
3use crate::state::{ProcessGroup, ProcessKind, ProcessRecord, ProcessStatus, State};
4use anyhow::{anyhow, Result};
5use nix::errno::Errno;
6use nix::sys::signal::{kill, Signal};
7use nix::unistd::Pid;
8use std::collections::BTreeMap;
9use std::path::Path;
10use std::sync::{
11 atomic::{AtomicBool, AtomicU32, Ordering},
12 Arc,
13};
14use std::time::{Duration, Instant};
15use time::OffsetDateTime;
16use tokio::fs as tokio_fs;
17use tokio::fs::OpenOptions;
18use tokio::process::Command;
19use tokio::time::sleep;
20
21pub struct RunningProcess {
23 pub record: ProcessRecord,
24 pub handle: ProcessHandle,
25}
26
27pub enum ProcessHandle {
29 Child(tokio::process::Child),
30 Task(tokio::task::JoinHandle<Result<()>>),
31}
32
33pub struct StartPlan {
35 pub rust_log: String,
36}
37
38pub async fn start(
40 layout: &AssetsLayout,
41 plan: &StartPlan,
42 state: &mut State,
43) -> Result<Vec<RunningProcess>> {
44 let total_nodes = layout.count_nodes().await?;
45 if total_nodes == 0 {
46 return Err(anyhow!(
47 "no nodes found under {}",
48 layout.nodes_dir().display()
49 ));
50 }
51 let node_ids: Vec<u32> = (1..=total_nodes).collect();
52
53 let mut started = Vec::new();
54 for node_id in node_ids {
55 if node_id > total_nodes {
56 return Err(anyhow!(
57 "node {} exceeds total nodes {}",
58 node_id,
59 total_nodes
60 ));
61 }
62 let mut records = start_node(layout, node_id, total_nodes, &plan.rust_log).await?;
63 started.append(&mut records);
64 }
65
66 state.processes = started.iter().map(|proc| proc.record.clone()).collect();
67 state.touch().await?;
68
69 Ok(started)
70}
71
72pub async fn stop(state: &mut State) -> Result<()> {
74 let mut running: Vec<&mut ProcessRecord> = state
75 .processes
76 .iter_mut()
77 .filter(|record| matches!(record.last_status, ProcessStatus::Running))
78 .collect();
79
80 let mut errors = Vec::new();
81
82 for record in &mut running {
83 if let Some(shutdown) = &record.shutdown_handle {
84 shutdown.store(true, Ordering::SeqCst);
85 }
86
87 if let Some(pid) = current_pid(record) {
88 println!(
89 "sending {} to {} (pid {})",
90 signal_name(Signal::SIGTERM),
91 record.id,
92 pid
93 );
94 if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
95 errors.push(format!(
96 "failed to send {} to {} (pid {}): {}",
97 signal_name(Signal::SIGTERM),
98 record.id,
99 pid,
100 err
101 ));
102 }
103 }
104 }
105
106 let deadline = Instant::now() + Duration::from_secs(5);
107 for record in &mut running {
108 if let Some(pid) = current_pid(record) {
109 while process_alive(pid as i32) && Instant::now() < deadline {
110 sleep(Duration::from_millis(200)).await;
111 }
112 if process_alive(pid as i32) {
113 println!(
114 "sending {} to {} (pid {})",
115 signal_name(Signal::SIGKILL),
116 record.id,
117 pid
118 );
119 if let Err(err) = send_signal(pid as i32, Signal::SIGKILL) {
120 errors.push(format!(
121 "failed to send {} to {} (pid {}): {}",
122 signal_name(Signal::SIGKILL),
123 record.id,
124 pid,
125 err
126 ));
127 }
128 if process_alive(pid as i32) {
129 errors.push(format!(
130 "{} (pid {}) still running after SIGKILL",
131 record.id, pid
132 ));
133 record.last_status = ProcessStatus::Unknown;
134 continue;
135 }
136 }
137 record.last_status = ProcessStatus::Stopped;
138 record.stopped_at = Some(OffsetDateTime::now_utc());
139 record.exit_code = None;
140 record.exit_signal = None;
141 continue;
142 }
143 errors.push(format!("missing pid for {} while stopping", record.id));
144 record.last_status = ProcessStatus::Unknown;
145 }
146
147 state.touch().await?;
148 if errors.is_empty() {
149 Ok(())
150 } else {
151 Err(anyhow!(errors.join("\n")))
152 }
153}
154
155async fn start_node(
157 layout: &AssetsLayout,
158 node_id: u32,
159 total_nodes: u32,
160 rust_log: &str,
161) -> Result<Vec<RunningProcess>> {
162 let mut records = Vec::new();
163
164 let node_record = spawn_node(layout, node_id, total_nodes, rust_log).await?;
165 records.push(node_record);
166
167 if let Some(sidecar_record) = spawn_sidecar(layout, node_id, total_nodes, rust_log).await? {
168 records.push(sidecar_record);
169 }
170
171 Ok(records)
172}
173
174async fn spawn_node(
176 layout: &AssetsLayout,
177 node_id: u32,
178 total_nodes: u32,
179 rust_log: &str,
180) -> Result<RunningProcess> {
181 let node_dir = layout.node_dir(node_id);
182
183 let stdout_path = layout.node_logs_dir(node_id).join("stdout.log");
184 let stderr_path = layout.node_logs_dir(node_id).join("stderr.log");
185
186 let mut launcher = Launcher::new_with_roots(
187 None,
188 layout.node_bin_dir(node_id),
189 layout.node_config_root(node_id),
190 )
191 .await?;
192 launcher.set_log_paths(stdout_path.clone(), stderr_path.clone());
193 launcher.set_cwd(node_dir.clone());
194 launcher.set_rust_log(rust_log.to_string());
195
196 let mut env = BTreeMap::new();
197 env.insert(
198 "CASPER_CONFIG_DIR".to_string(),
199 layout
200 .node_config_root(node_id)
201 .to_string_lossy()
202 .to_string(),
203 );
204 launcher.set_envs(env);
205
206 let (command_path, command_args) = launcher.current_command();
207 let child_pid = launcher.child_pid();
208 let shutdown = Arc::new(AtomicBool::new(false));
209 let shutdown_thread = Arc::clone(&shutdown);
210 let handle =
211 tokio::spawn(async move { launcher.run_with_shutdown(shutdown_thread.as_ref()).await });
212 let pid = wait_for_pid(child_pid.as_ref()).await;
213
214 Ok(RunningProcess {
215 record: ProcessRecord {
216 id: format!("node-{}", node_id),
217 node_id,
218 kind: ProcessKind::Node,
219 group: process_group(node_id, total_nodes),
220 command: command_path.to_string_lossy().to_string(),
221 args: command_args,
222 cwd: node_dir.to_string_lossy().to_string(),
223 pid,
224 pid_handle: Some(child_pid),
225 shutdown_handle: Some(shutdown),
226 stdout_path: stdout_path.to_string_lossy().to_string(),
227 stderr_path: stderr_path.to_string_lossy().to_string(),
228 started_at: Some(OffsetDateTime::now_utc()),
229 stopped_at: None,
230 exit_code: None,
231 exit_signal: None,
232 last_status: ProcessStatus::Running,
233 },
234 handle: ProcessHandle::Task(handle),
235 })
236}
237
238async fn spawn_sidecar(
240 layout: &AssetsLayout,
241 node_id: u32,
242 total_nodes: u32,
243 rust_log: &str,
244) -> Result<Option<RunningProcess>> {
245 let version_dir = layout.latest_protocol_version_dir(node_id).await?;
246 let command_path = layout
247 .node_bin_dir(node_id)
248 .join(&version_dir)
249 .join("casper-sidecar");
250 let config_path = layout
251 .node_config_root(node_id)
252 .join(&version_dir)
253 .join("sidecar.toml");
254
255 if !is_file(&command_path).await || !is_file(&config_path).await {
256 return Ok(None);
257 }
258
259 let node_dir = layout.node_dir(node_id);
260 let stdout_path = layout.node_logs_dir(node_id).join("sidecar-stdout.log");
261 let stderr_path = layout.node_logs_dir(node_id).join("sidecar-stderr.log");
262
263 let args = vec![
264 "--path-to-config".to_string(),
265 config_path.to_string_lossy().to_string(),
266 ];
267
268 let mut env = BTreeMap::new();
269 env.insert("RUST_LOG".to_string(), rust_log.to_string());
270
271 let child = spawn_process(
272 &command_path,
273 &args,
274 &env,
275 &node_dir,
276 &stdout_path,
277 &stderr_path,
278 )
279 .await?;
280 let pid = child.id();
281
282 Ok(Some(RunningProcess {
283 record: ProcessRecord {
284 id: format!("sidecar-{}", node_id),
285 node_id,
286 kind: ProcessKind::Sidecar,
287 group: process_group(node_id, total_nodes),
288 command: command_path.to_string_lossy().to_string(),
289 args,
290 cwd: node_dir.to_string_lossy().to_string(),
291 pid,
292 pid_handle: None,
293 shutdown_handle: None,
294 stdout_path: stdout_path.to_string_lossy().to_string(),
295 stderr_path: stderr_path.to_string_lossy().to_string(),
296 started_at: Some(OffsetDateTime::now_utc()),
297 stopped_at: None,
298 exit_code: None,
299 exit_signal: None,
300 last_status: ProcessStatus::Running,
301 },
302 handle: ProcessHandle::Child(child),
303 }))
304}
305
306async fn spawn_process(
308 command: &Path,
309 args: &[String],
310 env: &BTreeMap<String, String>,
311 cwd: &Path,
312 stdout_path: &Path,
313 stderr_path: &Path,
314) -> Result<tokio::process::Child> {
315 let stdout = OpenOptions::new()
316 .create(true)
317 .append(true)
318 .open(stdout_path)
319 .await?;
320 let stderr = OpenOptions::new()
321 .create(true)
322 .append(true)
323 .open(stderr_path)
324 .await?;
325 let stdout = stdout.into_std().await;
326 let stderr = stderr.into_std().await;
327
328 let mut cmd = Command::new(command);
329 cmd.args(args)
330 .envs(env)
331 .current_dir(cwd)
332 .stdout(stdout)
333 .stderr(stderr);
334
335 Ok(cmd.spawn()?)
336}
337
338fn process_group(node_id: u32, total_nodes: u32) -> ProcessGroup {
339 let genesis_nodes = total_nodes / 2;
340 if node_id <= BOOTSTRAP_NODES.min(genesis_nodes) {
341 ProcessGroup::Validators1
342 } else if node_id <= genesis_nodes {
343 ProcessGroup::Validators2
344 } else {
345 ProcessGroup::Validators3
346 }
347}
348
349fn process_alive(pid: i32) -> bool {
350 match kill(Pid::from_raw(pid), None) {
351 Ok(()) => true,
352 Err(Errno::ESRCH) => false,
353 Err(_) => true,
354 }
355}
356
357fn send_signal(target: i32, signal: Signal) -> Result<()> {
358 match kill(Pid::from_raw(target), signal) {
359 Ok(()) => Ok(()),
360 Err(Errno::ESRCH) => Ok(()),
361 Err(err) => Err(anyhow!(err)),
362 }
363}
364
365fn signal_name(signal: Signal) -> &'static str {
366 match signal {
367 Signal::SIGTERM => "SIGTERM",
368 Signal::SIGKILL => "SIGKILL",
369 _ => "signal",
370 }
371}
372
373async fn is_file(path: &Path) -> bool {
374 tokio_fs::metadata(path)
375 .await
376 .map(|meta| meta.is_file())
377 .unwrap_or(false)
378}
379
380async fn wait_for_pid(pid_handle: &AtomicU32) -> Option<u32> {
381 let deadline = Instant::now() + Duration::from_secs(1);
382 loop {
383 let pid = pid_handle.load(Ordering::SeqCst);
384 if pid != 0 {
385 return Some(pid);
386 }
387 if Instant::now() >= deadline {
388 return None;
389 }
390 sleep(Duration::from_millis(20)).await;
391 }
392}
393
394fn current_pid(record: &ProcessRecord) -> Option<u32> {
395 if let Some(handle) = &record.pid_handle {
396 let pid = handle.load(Ordering::SeqCst);
397 if pid != 0 {
398 return Some(pid);
399 }
400 }
401 record.pid
402}