1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use std::ffi::OsStr;
4use std::path::{Path, PathBuf};
5use std::sync::{
6 Arc,
7 atomic::{AtomicBool, AtomicU32, Ordering},
8};
9use std::time::{SystemTime, UNIX_EPOCH};
10use time::OffsetDateTime;
11use tokio::fs as tokio_fs;
12use tokio::sync::Mutex;
13use tokio::time::{Duration, sleep};
14use tracing::warn;
15
16pub const STATE_FILE_NAME: &str = "state.json";
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
20pub enum ProcessKind {
21 Node,
22 Sidecar,
23}
24
25#[derive(Clone, Debug, Serialize, Deserialize)]
27pub enum ProcessGroup {
28 Validators1,
29 Validators2,
30 Validators3,
31}
32
33#[derive(Clone, Debug, Serialize, Deserialize)]
35pub enum ProcessStatus {
36 Running,
37 Stopped,
38 Exited,
39 Unknown,
40 Skipped,
41}
42
43#[derive(Clone, Debug, Serialize, Deserialize)]
45pub struct ProcessRecord {
46 pub id: String,
47 pub node_id: u32,
48 pub kind: ProcessKind,
49 pub group: ProcessGroup,
50 pub command: String,
51 pub args: Vec<String>,
52 pub cwd: String,
53 pub pid: Option<u32>,
54 #[serde(skip)]
55 pub pid_handle: Option<Arc<AtomicU32>>,
56 #[serde(skip)]
57 pub shutdown_handle: Option<Arc<AtomicBool>>,
58 pub stdout_path: String,
59 pub stderr_path: String,
60 #[serde(with = "time::serde::rfc3339::option")]
61 pub started_at: Option<OffsetDateTime>,
62 #[serde(with = "time::serde::rfc3339::option")]
63 pub stopped_at: Option<OffsetDateTime>,
64 pub exit_code: Option<i32>,
65 pub exit_signal: Option<i32>,
66 pub last_status: ProcessStatus,
67}
68
69impl ProcessRecord {
70 pub fn current_pid(&self) -> Option<u32> {
71 if let Some(handle) = &self.pid_handle {
72 let pid = handle.load(Ordering::SeqCst);
73 if pid != 0 {
74 return Some(pid);
75 }
76 }
77 self.pid
78 }
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize)]
83pub struct State {
84 #[serde(with = "time::serde::rfc3339")]
85 pub created_at: OffsetDateTime,
86 #[serde(with = "time::serde::rfc3339")]
87 pub updated_at: OffsetDateTime,
88 pub last_block_height: Option<u64>,
89 pub processes: Vec<ProcessRecord>,
90 #[serde(skip)]
91 path: PathBuf,
92}
93
94impl State {
95 pub async fn new(path: PathBuf) -> Result<Self> {
96 let now = OffsetDateTime::now_utc();
97 let state = Self {
98 created_at: now,
99 updated_at: now,
100 last_block_height: None,
101 processes: Vec::new(),
102 path,
103 };
104 state.persist().await?;
105 Ok(state)
106 }
107
108 pub async fn touch(&mut self) -> Result<()> {
109 self.updated_at = OffsetDateTime::now_utc();
110 self.persist().await
111 }
112
113 async fn persist(&self) -> Result<()> {
114 ensure_parent(&self.path).await?;
115 let mut snapshot = self.clone();
116 for process in &mut snapshot.processes {
117 process.pid = process.current_pid();
118 process.pid_handle = None;
119 process.shutdown_handle = None;
120 }
121 let contents = serde_json::to_string_pretty(&snapshot)?;
122 write_atomic(&self.path, contents).await
123 }
124}
125
126pub async fn spawn_pid_sync_tasks(state: Arc<Mutex<State>>) {
127 let tracked = {
128 let state = state.lock().await;
129 state
130 .processes
131 .iter()
132 .filter_map(|process| {
133 process
134 .pid_handle
135 .as_ref()
136 .map(|handle| (process.id.clone(), Arc::clone(handle)))
137 })
138 .collect::<Vec<_>>()
139 };
140
141 for (process_id, pid_handle) in tracked {
142 spawn_pid_sync_task(Arc::clone(&state), process_id, pid_handle);
143 }
144}
145
146pub async fn spawn_pid_sync_tasks_for_ids(state: Arc<Mutex<State>>, process_ids: &[String]) {
147 let tracked = {
148 let state = state.lock().await;
149 state
150 .processes
151 .iter()
152 .filter(|process| process_ids.iter().any(|id| id == &process.id))
153 .filter_map(|process| {
154 process
155 .pid_handle
156 .as_ref()
157 .map(|handle| (process.id.clone(), Arc::clone(handle)))
158 })
159 .collect::<Vec<_>>()
160 };
161
162 for (process_id, pid_handle) in tracked {
163 spawn_pid_sync_task(Arc::clone(&state), process_id, pid_handle);
164 }
165}
166
167fn spawn_pid_sync_task(state: Arc<Mutex<State>>, process_id: String, pid_handle: Arc<AtomicU32>) {
168 tokio::spawn(async move {
169 let mut last_seen = Some(u32::MAX);
170 loop {
171 let current_pid = {
172 let pid = pid_handle.load(Ordering::SeqCst);
173 (pid != 0).then_some(pid)
174 };
175 let should_exit;
176
177 if current_pid != last_seen {
178 last_seen = current_pid;
179 let mut state = state.lock().await;
180 let Some(process) = state
181 .processes
182 .iter_mut()
183 .find(|process| process.id == process_id)
184 else {
185 return;
186 };
187
188 process.pid = current_pid;
189 should_exit =
190 !matches!(process.last_status, ProcessStatus::Running) && current_pid.is_none();
191
192 if let Err(error) = state.touch().await {
193 warn!(
194 %error,
195 process_id,
196 "failed to persist updated process pid"
197 );
198 return;
199 }
200 } else {
201 let state = state.lock().await;
202 let Some(process) = state
203 .processes
204 .iter()
205 .find(|process| process.id == process_id)
206 else {
207 return;
208 };
209 should_exit =
210 !matches!(process.last_status, ProcessStatus::Running) && current_pid.is_none();
211 }
212
213 if should_exit {
214 return;
215 }
216
217 sleep(Duration::from_millis(100)).await;
218 }
219 });
220}
221
222async fn ensure_parent(path: &Path) -> Result<()> {
223 if let Some(parent) = path.parent() {
224 tokio_fs::create_dir_all(parent).await?;
225 }
226 Ok(())
227}
228
229async fn write_atomic(path: &Path, contents: String) -> Result<()> {
230 let base_name = path
231 .file_name()
232 .unwrap_or_else(|| OsStr::new(STATE_FILE_NAME))
233 .to_string_lossy();
234 let suffix = SystemTime::now()
235 .duration_since(UNIX_EPOCH)
236 .map(|duration| duration.as_nanos())
237 .unwrap_or(0);
238 let tmp_dir = tempfile::Builder::new()
239 .prefix("casper-devnet-state")
240 .tempdir()?;
241 let tmp_name = format!("{}.{}.{}.tmp", base_name, std::process::id(), suffix);
242 let tmp_path = tmp_dir.path().join(tmp_name);
243 tokio_fs::write(&tmp_path, contents).await?;
244 if let Err(err) = tokio_fs::rename(&tmp_path, path).await {
245 let _ = tokio_fs::remove_file(&tmp_path).await;
246 return Err(err.into());
247 }
248 Ok(())
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use serde_json::Value;
255 use tempfile::TempDir;
256
257 fn test_record(pid: Option<u32>, pid_handle: Option<Arc<AtomicU32>>) -> ProcessRecord {
258 ProcessRecord {
259 id: "node-1".to_string(),
260 node_id: 1,
261 kind: ProcessKind::Node,
262 group: ProcessGroup::Validators1,
263 command: "/tmp/casper-node".to_string(),
264 args: vec!["validator".to_string()],
265 cwd: "/tmp/network".to_string(),
266 pid,
267 pid_handle,
268 shutdown_handle: None,
269 stdout_path: "/tmp/stdout.log".to_string(),
270 stderr_path: "/tmp/stderr.log".to_string(),
271 started_at: None,
272 stopped_at: None,
273 exit_code: None,
274 exit_signal: None,
275 last_status: ProcessStatus::Running,
276 }
277 }
278
279 async fn read_pid(path: &Path) -> Option<u64> {
280 let contents = tokio_fs::read_to_string(path).await.unwrap();
281 let value: Value = serde_json::from_str(&contents).unwrap();
282 value["processes"][0]["pid"].as_u64()
283 }
284
285 #[tokio::test(flavor = "current_thread")]
286 async fn touch_persists_current_pid_from_handle() {
287 let temp_dir = TempDir::new().unwrap();
288 let state_path = temp_dir.path().join("state.json");
289 let pid_handle = Arc::new(AtomicU32::new(4242));
290
291 let mut state = State::new(state_path.clone()).await.unwrap();
292 state
293 .processes
294 .push(test_record(Some(1111), Some(Arc::clone(&pid_handle))));
295 state.touch().await.unwrap();
296
297 assert_eq!(read_pid(&state_path).await, Some(4242));
298 }
299
300 #[tokio::test(flavor = "current_thread")]
301 async fn pid_sync_task_updates_state_when_pid_changes() {
302 let temp_dir = TempDir::new().unwrap();
303 let state_path = temp_dir.path().join("state.json");
304 let pid_handle = Arc::new(AtomicU32::new(5001));
305
306 let mut state = State::new(state_path.clone()).await.unwrap();
307 state
308 .processes
309 .push(test_record(Some(5001), Some(Arc::clone(&pid_handle))));
310 state.touch().await.unwrap();
311
312 let state = Arc::new(Mutex::new(state));
313 spawn_pid_sync_tasks(Arc::clone(&state)).await;
314 pid_handle.store(6002, Ordering::SeqCst);
315
316 for _ in 0..50 {
317 if read_pid(&state_path).await == Some(6002) {
318 return;
319 }
320 sleep(Duration::from_millis(20)).await;
321 }
322
323 panic!("timed out waiting for pid sync task to persist updated pid");
324 }
325}