1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use std::ffi::OsStr;
4use std::path::{Path, PathBuf};
5use std::sync::{
6 Arc,
7 atomic::{AtomicBool, AtomicU32, Ordering},
8};
9use std::time::{SystemTime, UNIX_EPOCH};
10use time::OffsetDateTime;
11use tokio::fs as tokio_fs;
12use tokio::sync::Mutex;
13use tokio::time::{Duration, sleep};
14use tracing::warn;
15
16pub const STATE_FILE_NAME: &str = "state.json";
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
20pub enum ProcessKind {
21 Node,
22 Sidecar,
23}
24
25#[derive(Clone, Debug, Serialize, Deserialize)]
27pub enum ProcessGroup {
28 Validators1,
29 Validators2,
30 Validators3,
31}
32
33#[derive(Clone, Debug, Serialize, Deserialize)]
35pub enum ProcessStatus {
36 Running,
37 Stopped,
38 Exited,
39 Unknown,
40 Skipped,
41}
42
43#[derive(Clone, Debug, Serialize, Deserialize)]
45pub struct ProcessRecord {
46 pub id: String,
47 pub node_id: u32,
48 pub kind: ProcessKind,
49 pub group: ProcessGroup,
50 pub command: String,
51 pub args: Vec<String>,
52 pub cwd: String,
53 pub pid: Option<u32>,
54 #[serde(skip)]
55 pub pid_handle: Option<Arc<AtomicU32>>,
56 #[serde(skip)]
57 pub shutdown_handle: Option<Arc<AtomicBool>>,
58 pub stdout_path: String,
59 pub stderr_path: String,
60 #[serde(with = "time::serde::rfc3339::option")]
61 pub started_at: Option<OffsetDateTime>,
62 #[serde(with = "time::serde::rfc3339::option")]
63 pub stopped_at: Option<OffsetDateTime>,
64 pub exit_code: Option<i32>,
65 pub exit_signal: Option<i32>,
66 pub last_status: ProcessStatus,
67}
68
69impl ProcessRecord {
70 pub fn current_pid(&self) -> Option<u32> {
71 if let Some(handle) = &self.pid_handle {
72 let pid = handle.load(Ordering::SeqCst);
73 if pid != 0 {
74 return Some(pid);
75 }
76 }
77 self.pid
78 }
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize)]
83pub struct State {
84 #[serde(with = "time::serde::rfc3339")]
85 pub created_at: OffsetDateTime,
86 #[serde(with = "time::serde::rfc3339")]
87 pub updated_at: OffsetDateTime,
88 pub last_block_height: Option<u64>,
89 pub processes: Vec<ProcessRecord>,
90 #[serde(skip)]
91 path: PathBuf,
92}
93
94impl State {
95 pub async fn new(path: PathBuf) -> Result<Self> {
96 let now = OffsetDateTime::now_utc();
97 let state = Self {
98 created_at: now,
99 updated_at: now,
100 last_block_height: None,
101 processes: Vec::new(),
102 path,
103 };
104 state.persist().await?;
105 Ok(state)
106 }
107
108 pub async fn touch(&mut self) -> Result<()> {
109 self.updated_at = OffsetDateTime::now_utc();
110 self.persist().await
111 }
112
113 async fn persist(&self) -> Result<()> {
114 ensure_parent(&self.path).await?;
115 let mut snapshot = self.clone();
116 for process in &mut snapshot.processes {
117 process.pid = process.current_pid();
118 process.pid_handle = None;
119 process.shutdown_handle = None;
120 }
121 let contents = serde_json::to_string_pretty(&snapshot)?;
122 write_atomic(&self.path, contents).await
123 }
124}
125
126pub async fn spawn_pid_sync_tasks(state: Arc<Mutex<State>>) {
127 let tracked = {
128 let state = state.lock().await;
129 state
130 .processes
131 .iter()
132 .filter_map(|process| {
133 process
134 .pid_handle
135 .as_ref()
136 .map(|handle| (process.id.clone(), Arc::clone(handle)))
137 })
138 .collect::<Vec<_>>()
139 };
140
141 for (process_id, pid_handle) in tracked {
142 spawn_pid_sync_task(Arc::clone(&state), process_id, pid_handle);
143 }
144}
145
146pub async fn spawn_pid_sync_tasks_for_ids(state: Arc<Mutex<State>>, process_ids: &[String]) {
147 let tracked = {
148 let state = state.lock().await;
149 state
150 .processes
151 .iter()
152 .filter(|process| process_ids.iter().any(|id| id == &process.id))
153 .filter_map(|process| {
154 process
155 .pid_handle
156 .as_ref()
157 .map(|handle| (process.id.clone(), Arc::clone(handle)))
158 })
159 .collect::<Vec<_>>()
160 };
161
162 for (process_id, pid_handle) in tracked {
163 spawn_pid_sync_task(Arc::clone(&state), process_id, pid_handle);
164 }
165}
166
167fn spawn_pid_sync_task(state: Arc<Mutex<State>>, process_id: String, pid_handle: Arc<AtomicU32>) {
168 tokio::spawn(async move {
169 let mut last_seen = Some(u32::MAX);
170 loop {
171 let current_pid = {
172 let pid = pid_handle.load(Ordering::SeqCst);
173 (pid != 0).then_some(pid)
174 };
175 let should_exit;
176
177 if current_pid != last_seen {
178 last_seen = current_pid;
179 let mut state = state.lock().await;
180 let Some(process) = state
181 .processes
182 .iter_mut()
183 .find(|process| process.id == process_id)
184 else {
185 return;
186 };
187
188 process.pid = current_pid;
189 should_exit =
190 !matches!(process.last_status, ProcessStatus::Running) && current_pid.is_none();
191
192 if let Err(error) = state.touch().await {
193 warn!(
194 %error,
195 process_id,
196 "failed to persist updated process pid"
197 );
198 return;
199 }
200 } else {
201 let state = state.lock().await;
202 let Some(process) = state
203 .processes
204 .iter()
205 .find(|process| process.id == process_id)
206 else {
207 return;
208 };
209 should_exit =
210 !matches!(process.last_status, ProcessStatus::Running) && current_pid.is_none();
211 }
212
213 if should_exit {
214 return;
215 }
216
217 sleep(Duration::from_millis(100)).await;
218 }
219 });
220}
221
222async fn ensure_parent(path: &Path) -> Result<()> {
223 if let Some(parent) = path.parent() {
224 tokio_fs::create_dir_all(parent).await?;
225 }
226 Ok(())
227}
228
229async fn write_atomic(path: &Path, contents: String) -> Result<()> {
230 let suffix = SystemTime::now()
231 .duration_since(UNIX_EPOCH)
232 .map(|duration| duration.as_nanos())
233 .unwrap_or(0);
234 let tmp_path = atomic_tmp_path(path, suffix);
235 tokio_fs::write(&tmp_path, contents).await?;
236 if let Err(err) = tokio_fs::rename(&tmp_path, path).await {
237 let _ = tokio_fs::remove_file(&tmp_path).await;
238 return Err(err.into());
239 }
240 Ok(())
241}
242
243fn atomic_tmp_path(path: &Path, suffix: u128) -> PathBuf {
244 let parent = path
245 .parent()
246 .filter(|parent| !parent.as_os_str().is_empty())
247 .unwrap_or_else(|| Path::new("."));
248 let base_name = path
249 .file_name()
250 .unwrap_or_else(|| OsStr::new(STATE_FILE_NAME))
251 .to_string_lossy();
252 parent.join(format!(
253 ".{}.{}.{}.tmp",
254 base_name,
255 std::process::id(),
256 suffix
257 ))
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use serde_json::Value;
264 use tempfile::TempDir;
265
266 fn test_record(pid: Option<u32>, pid_handle: Option<Arc<AtomicU32>>) -> ProcessRecord {
267 ProcessRecord {
268 id: "node-1".to_string(),
269 node_id: 1,
270 kind: ProcessKind::Node,
271 group: ProcessGroup::Validators1,
272 command: "/tmp/casper-node".to_string(),
273 args: vec!["validator".to_string()],
274 cwd: "/tmp/network".to_string(),
275 pid,
276 pid_handle,
277 shutdown_handle: None,
278 stdout_path: "/tmp/stdout.log".to_string(),
279 stderr_path: "/tmp/stderr.log".to_string(),
280 started_at: None,
281 stopped_at: None,
282 exit_code: None,
283 exit_signal: None,
284 last_status: ProcessStatus::Running,
285 }
286 }
287
288 async fn read_pid(path: &Path) -> Option<u64> {
289 let contents = tokio_fs::read_to_string(path).await.unwrap();
290 let value: Value = serde_json::from_str(&contents).unwrap();
291 value["processes"][0]["pid"].as_u64()
292 }
293
294 #[test]
295 fn atomic_tmp_path_uses_state_file_parent() {
296 let state_path = Path::new("/tmp/devnet/networks/casper-dev/state.json");
297 let tmp_path = atomic_tmp_path(state_path, 42);
298
299 assert_eq!(tmp_path.parent(), state_path.parent());
300 assert!(
301 tmp_path
302 .file_name()
303 .unwrap()
304 .to_string_lossy()
305 .starts_with(".state.json.")
306 );
307 }
308
309 #[tokio::test(flavor = "current_thread")]
310 async fn touch_persists_current_pid_from_handle() {
311 let temp_dir = TempDir::new().unwrap();
312 let state_path = temp_dir.path().join("state.json");
313 let pid_handle = Arc::new(AtomicU32::new(4242));
314
315 let mut state = State::new(state_path.clone()).await.unwrap();
316 state
317 .processes
318 .push(test_record(Some(1111), Some(Arc::clone(&pid_handle))));
319 state.touch().await.unwrap();
320
321 assert_eq!(read_pid(&state_path).await, Some(4242));
322 }
323
324 #[tokio::test(flavor = "current_thread")]
325 async fn pid_sync_task_updates_state_when_pid_changes() {
326 let temp_dir = TempDir::new().unwrap();
327 let state_path = temp_dir.path().join("state.json");
328 let pid_handle = Arc::new(AtomicU32::new(5001));
329
330 let mut state = State::new(state_path.clone()).await.unwrap();
331 state
332 .processes
333 .push(test_record(Some(5001), Some(Arc::clone(&pid_handle))));
334 state.touch().await.unwrap();
335
336 let state = Arc::new(Mutex::new(state));
337 spawn_pid_sync_tasks(Arc::clone(&state)).await;
338 pid_handle.store(6002, Ordering::SeqCst);
339
340 for _ in 0..50 {
341 if read_pid(&state_path).await == Some(6002) {
342 return;
343 }
344 sleep(Duration::from_millis(20)).await;
345 }
346
347 panic!("timed out waiting for pid sync task to persist updated pid");
348 }
349}