1use crate::assets::{AssetsLayout, BOOTSTRAP_NODES};
2use crate::node_launcher::Launcher;
3use crate::state::{ProcessGroup, ProcessKind, ProcessRecord, ProcessStatus, State};
4use anyhow::{Result, anyhow};
5use nix::errno::Errno;
6use nix::sys::signal::{Signal, kill};
7use nix::unistd::Pid;
8use std::collections::BTreeMap;
9use std::path::Path;
10use std::sync::{
11 Arc,
12 atomic::{AtomicBool, AtomicU32, Ordering},
13};
14use std::time::{Duration, Instant};
15use time::OffsetDateTime;
16use tokio::fs as tokio_fs;
17use tokio::fs::OpenOptions;
18use tokio::process::Command;
19use tokio::time::sleep;
20
21pub struct RunningProcess {
23 pub record: ProcessRecord,
24 pub handle: ProcessHandle,
25}
26
27pub enum ProcessHandle {
29 Child(tokio::process::Child),
30 Task(tokio::task::JoinHandle<Result<()>>),
31}
32
33pub struct StartPlan {
35 pub rust_log: String,
36}
37
38pub async fn start(
40 layout: &AssetsLayout,
41 plan: &StartPlan,
42 state: &mut State,
43) -> Result<Vec<RunningProcess>> {
44 let total_nodes = layout.count_nodes().await?;
45 if total_nodes == 0 {
46 return Err(anyhow!(
47 "no nodes found under {}",
48 layout.nodes_dir().display()
49 ));
50 }
51 let node_ids: Vec<u32> = (1..=total_nodes).collect();
52
53 let mut started = Vec::new();
54 for node_id in node_ids {
55 if node_id > total_nodes {
56 return Err(anyhow!(
57 "node {} exceeds total nodes {}",
58 node_id,
59 total_nodes
60 ));
61 }
62 let mut records = start_node(layout, node_id, total_nodes, &plan.rust_log).await?;
63 started.append(&mut records);
64 }
65
66 state.processes = started.iter().map(|proc| proc.record.clone()).collect();
67 state.touch().await?;
68
69 Ok(started)
70}
71
72pub async fn stop(state: &mut State) -> Result<()> {
74 let mut running: Vec<&mut ProcessRecord> = state
75 .processes
76 .iter_mut()
77 .filter(|record| matches!(record.last_status, ProcessStatus::Running))
78 .collect();
79
80 let mut errors = Vec::new();
81
82 for record in &mut running {
83 if let Some(shutdown) = &record.shutdown_handle {
84 shutdown.store(true, Ordering::SeqCst);
85 }
86
87 if let Some(pid) = current_pid(record) {
88 eprintln!(
89 "sending {} to {} (pid {})",
90 signal_name(Signal::SIGTERM),
91 record.id,
92 pid
93 );
94 if let Err(err) = send_signal(pid as i32, Signal::SIGTERM) {
95 errors.push(format!(
96 "failed to send {} to {} (pid {}): {}",
97 signal_name(Signal::SIGTERM),
98 record.id,
99 pid,
100 err
101 ));
102 }
103 }
104 }
105
106 let deadline = Instant::now() + Duration::from_secs(5);
107 for record in &mut running {
108 if let Some(pid) = current_pid(record) {
109 while process_alive(pid as i32) && Instant::now() < deadline {
110 sleep(Duration::from_millis(200)).await;
111 }
112 if process_alive(pid as i32) {
113 eprintln!(
114 "sending {} to {} (pid {})",
115 signal_name(Signal::SIGKILL),
116 record.id,
117 pid
118 );
119 if let Err(err) = send_signal(pid as i32, Signal::SIGKILL) {
120 errors.push(format!(
121 "failed to send {} to {} (pid {}): {}",
122 signal_name(Signal::SIGKILL),
123 record.id,
124 pid,
125 err
126 ));
127 }
128 if process_alive(pid as i32) {
129 errors.push(format!(
130 "{} (pid {}) still running after SIGKILL",
131 record.id, pid
132 ));
133 record.last_status = ProcessStatus::Unknown;
134 continue;
135 }
136 }
137 record.last_status = ProcessStatus::Stopped;
138 record.stopped_at = Some(OffsetDateTime::now_utc());
139 record.exit_code = None;
140 record.exit_signal = None;
141 continue;
142 }
143 errors.push(format!("missing pid for {} while stopping", record.id));
144 record.last_status = ProcessStatus::Unknown;
145 }
146
147 state.touch().await?;
148 if errors.is_empty() {
149 Ok(())
150 } else {
151 Err(anyhow!(errors.join("\n")))
152 }
153}
154
155async fn start_node(
157 layout: &AssetsLayout,
158 node_id: u32,
159 total_nodes: u32,
160 rust_log: &str,
161) -> Result<Vec<RunningProcess>> {
162 let mut records = Vec::new();
163
164 let node_record = spawn_node(layout, node_id, total_nodes, rust_log).await?;
165 records.push(node_record);
166
167 if let Some(sidecar_record) = spawn_sidecar(layout, node_id, total_nodes, rust_log).await? {
168 records.push(sidecar_record);
169 }
170
171 Ok(records)
172}
173
174async fn spawn_node(
176 layout: &AssetsLayout,
177 node_id: u32,
178 total_nodes: u32,
179 rust_log: &str,
180) -> Result<RunningProcess> {
181 let node_dir = layout.node_dir(node_id);
182
183 let stdout_path = layout.node_logs_dir(node_id).join("stdout.log");
184 let stderr_path = layout.node_logs_dir(node_id).join("stderr.log");
185 create_log_symlinks(
186 layout,
187 node_id,
188 ProcessKind::Node,
189 &stdout_path,
190 &stderr_path,
191 )
192 .await?;
193
194 let mut launcher = Launcher::new_with_roots(
195 None,
196 layout.node_bin_dir(node_id),
197 layout.node_config_root(node_id),
198 )
199 .await?;
200 launcher.set_log_paths(stdout_path.clone(), stderr_path.clone());
201 launcher.set_cwd(node_dir.clone());
202 launcher.set_rust_log(rust_log.to_string());
203
204 let mut env = BTreeMap::new();
205 env.insert(
206 "CASPER_CONFIG_DIR".to_string(),
207 layout
208 .node_config_root(node_id)
209 .to_string_lossy()
210 .to_string(),
211 );
212 launcher.set_envs(env);
213
214 let (command_path, command_args) = launcher.current_command();
215 let child_pid = launcher.child_pid();
216 let shutdown = Arc::new(AtomicBool::new(false));
217 let shutdown_thread = Arc::clone(&shutdown);
218 let handle =
219 tokio::spawn(async move { launcher.run_with_shutdown(shutdown_thread.as_ref()).await });
220 let pid = wait_for_pid(child_pid.as_ref()).await;
221
222 Ok(RunningProcess {
223 record: ProcessRecord {
224 id: format!("node-{}", node_id),
225 node_id,
226 kind: ProcessKind::Node,
227 group: process_group(node_id, total_nodes),
228 command: command_path.to_string_lossy().to_string(),
229 args: command_args,
230 cwd: node_dir.to_string_lossy().to_string(),
231 pid,
232 pid_handle: Some(child_pid),
233 shutdown_handle: Some(shutdown),
234 stdout_path: stdout_path.to_string_lossy().to_string(),
235 stderr_path: stderr_path.to_string_lossy().to_string(),
236 started_at: Some(OffsetDateTime::now_utc()),
237 stopped_at: None,
238 exit_code: None,
239 exit_signal: None,
240 last_status: ProcessStatus::Running,
241 },
242 handle: ProcessHandle::Task(handle),
243 })
244}
245
246async fn spawn_sidecar(
248 layout: &AssetsLayout,
249 node_id: u32,
250 total_nodes: u32,
251 rust_log: &str,
252) -> Result<Option<RunningProcess>> {
253 let version_dir = layout.latest_protocol_version_dir(node_id).await?;
254 let command_path = layout
255 .node_bin_dir(node_id)
256 .join(&version_dir)
257 .join("casper-sidecar");
258 let config_path = layout
259 .node_config_root(node_id)
260 .join(&version_dir)
261 .join("sidecar.toml");
262
263 if !is_file(&command_path).await || !is_file(&config_path).await {
264 return Ok(None);
265 }
266
267 let node_dir = layout.node_dir(node_id);
268 let stdout_path = layout.node_logs_dir(node_id).join("sidecar-stdout.log");
269 let stderr_path = layout.node_logs_dir(node_id).join("sidecar-stderr.log");
270 create_log_symlinks(
271 layout,
272 node_id,
273 ProcessKind::Sidecar,
274 &stdout_path,
275 &stderr_path,
276 )
277 .await?;
278
279 let args = vec![
280 "--path-to-config".to_string(),
281 config_path.to_string_lossy().to_string(),
282 ];
283
284 let mut env = BTreeMap::new();
285 env.insert("RUST_LOG".to_string(), rust_log.to_string());
286
287 let child = spawn_process(
288 &command_path,
289 &args,
290 &env,
291 &node_dir,
292 &stdout_path,
293 &stderr_path,
294 )
295 .await?;
296 let pid = child.id();
297
298 Ok(Some(RunningProcess {
299 record: ProcessRecord {
300 id: format!("sidecar-{}", node_id),
301 node_id,
302 kind: ProcessKind::Sidecar,
303 group: process_group(node_id, total_nodes),
304 command: command_path.to_string_lossy().to_string(),
305 args,
306 cwd: node_dir.to_string_lossy().to_string(),
307 pid,
308 pid_handle: None,
309 shutdown_handle: None,
310 stdout_path: stdout_path.to_string_lossy().to_string(),
311 stderr_path: stderr_path.to_string_lossy().to_string(),
312 started_at: Some(OffsetDateTime::now_utc()),
313 stopped_at: None,
314 exit_code: None,
315 exit_signal: None,
316 last_status: ProcessStatus::Running,
317 },
318 handle: ProcessHandle::Child(child),
319 }))
320}
321
322async fn spawn_process(
324 command: &Path,
325 args: &[String],
326 env: &BTreeMap<String, String>,
327 cwd: &Path,
328 stdout_path: &Path,
329 stderr_path: &Path,
330) -> Result<tokio::process::Child> {
331 let stdout = OpenOptions::new()
332 .create(true)
333 .append(true)
334 .open(stdout_path)
335 .await?;
336 let stderr = OpenOptions::new()
337 .create(true)
338 .append(true)
339 .open(stderr_path)
340 .await?;
341 let stdout = stdout.into_std().await;
342 let stderr = stderr.into_std().await;
343
344 let mut cmd = Command::new(command);
345 cmd.args(args)
346 .envs(env)
347 .current_dir(cwd)
348 .stdout(stdout)
349 .stderr(stderr);
350
351 Ok(cmd.spawn()?)
352}
353
354async fn create_log_symlinks(
355 layout: &AssetsLayout,
356 node_id: u32,
357 kind: ProcessKind,
358 stdout_path: &Path,
359 stderr_path: &Path,
360) -> Result<()> {
361 let data_dir = layout.net_dir();
362 tokio_fs::create_dir_all(&data_dir).await?;
363 let prefix = match kind {
364 ProcessKind::Node => format!("node-{}", node_id),
365 ProcessKind::Sidecar => format!("sidecar-{}", node_id),
366 };
367 let stdout_link = data_dir.join(format!("{}.stdout", prefix));
368 let stderr_link = data_dir.join(format!("{}.stderr", prefix));
369 create_symlink(&stdout_link, stdout_path).await?;
370 create_symlink(&stderr_link, stderr_path).await?;
371 Ok(())
372}
373
374async fn create_symlink(link_path: &Path, target_path: &Path) -> Result<()> {
375 if let Ok(metadata) = tokio_fs::symlink_metadata(link_path).await {
376 if metadata.is_dir() {
377 tokio_fs::remove_dir_all(link_path).await?;
378 } else {
379 tokio_fs::remove_file(link_path).await?;
380 }
381 }
382 tokio_fs::symlink(target_path, link_path).await?;
383 Ok(())
384}
385
386fn process_group(node_id: u32, total_nodes: u32) -> ProcessGroup {
387 let genesis_nodes = total_nodes / 2;
388 if node_id <= BOOTSTRAP_NODES.min(genesis_nodes) {
389 ProcessGroup::Validators1
390 } else if node_id <= genesis_nodes {
391 ProcessGroup::Validators2
392 } else {
393 ProcessGroup::Validators3
394 }
395}
396
397fn process_alive(pid: i32) -> bool {
398 match kill(Pid::from_raw(pid), None) {
399 Ok(()) => true,
400 Err(Errno::ESRCH) => false,
401 Err(_) => true,
402 }
403}
404
405fn send_signal(target: i32, signal: Signal) -> Result<()> {
406 match kill(Pid::from_raw(target), signal) {
407 Ok(()) => Ok(()),
408 Err(Errno::ESRCH) => Ok(()),
409 Err(err) => Err(anyhow!(err)),
410 }
411}
412
413fn signal_name(signal: Signal) -> &'static str {
414 match signal {
415 Signal::SIGTERM => "SIGTERM",
416 Signal::SIGKILL => "SIGKILL",
417 _ => "signal",
418 }
419}
420
421async fn is_file(path: &Path) -> bool {
422 tokio_fs::metadata(path)
423 .await
424 .map(|meta| meta.is_file())
425 .unwrap_or(false)
426}
427
428async fn wait_for_pid(pid_handle: &AtomicU32) -> Option<u32> {
429 let deadline = Instant::now() + Duration::from_secs(1);
430 loop {
431 let pid = pid_handle.load(Ordering::SeqCst);
432 if pid != 0 {
433 return Some(pid);
434 }
435 if Instant::now() >= deadline {
436 return None;
437 }
438 sleep(Duration::from_millis(20)).await;
439 }
440}
441
442fn current_pid(record: &ProcessRecord) -> Option<u32> {
443 if let Some(handle) = &record.pid_handle {
444 let pid = handle.load(Ordering::SeqCst);
445 if pid != 0 {
446 return Some(pid);
447 }
448 }
449 record.pid
450}