Skip to main content

casper_devnet/
state.rs

1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use std::ffi::OsStr;
4use std::path::{Path, PathBuf};
5use std::sync::{
6    Arc,
7    atomic::{AtomicBool, AtomicU32, Ordering},
8};
9use std::time::{SystemTime, UNIX_EPOCH};
10use time::OffsetDateTime;
11use tokio::fs as tokio_fs;
12use tokio::sync::Mutex;
13use tokio::time::{Duration, sleep};
14use tracing::warn;
15
16pub const STATE_FILE_NAME: &str = "state.json";
17
18/// Process classification used in logs and reporting.
19#[derive(Clone, Debug, Serialize, Deserialize)]
20pub enum ProcessKind {
21    Node,
22    Sidecar,
23}
24
25/// Logical process grouping (kept for parity with NCTL UX).
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub enum ProcessGroup {
28    Validators1,
29    Validators2,
30    Validators3,
31}
32
33/// Runtime status of a tracked process.
34#[derive(Clone, Debug, Serialize, Deserialize)]
35pub enum ProcessStatus {
36    Running,
37    Stopped,
38    Exited,
39    Unknown,
40    Skipped,
41}
42
43/// Persisted record of a process and its lifecycle details.
44#[derive(Clone, Debug, Serialize, Deserialize)]
45pub struct ProcessRecord {
46    pub id: String,
47    pub node_id: u32,
48    pub kind: ProcessKind,
49    pub group: ProcessGroup,
50    pub command: String,
51    pub args: Vec<String>,
52    pub cwd: String,
53    pub pid: Option<u32>,
54    #[serde(skip)]
55    pub pid_handle: Option<Arc<AtomicU32>>,
56    #[serde(skip)]
57    pub shutdown_handle: Option<Arc<AtomicBool>>,
58    pub stdout_path: String,
59    pub stderr_path: String,
60    #[serde(with = "time::serde::rfc3339::option")]
61    pub started_at: Option<OffsetDateTime>,
62    #[serde(with = "time::serde::rfc3339::option")]
63    pub stopped_at: Option<OffsetDateTime>,
64    pub exit_code: Option<i32>,
65    pub exit_signal: Option<i32>,
66    pub last_status: ProcessStatus,
67}
68
69impl ProcessRecord {
70    pub fn current_pid(&self) -> Option<u32> {
71        if let Some(handle) = &self.pid_handle {
72            let pid = handle.load(Ordering::SeqCst);
73            if pid != 0 {
74                return Some(pid);
75            }
76        }
77        self.pid
78    }
79}
80
81/// State snapshot stored for process bookkeeping.
82#[derive(Clone, Debug, Serialize, Deserialize)]
83pub struct State {
84    #[serde(with = "time::serde::rfc3339")]
85    pub created_at: OffsetDateTime,
86    #[serde(with = "time::serde::rfc3339")]
87    pub updated_at: OffsetDateTime,
88    pub last_block_height: Option<u64>,
89    pub processes: Vec<ProcessRecord>,
90    #[serde(skip)]
91    path: PathBuf,
92}
93
94impl State {
95    pub async fn new(path: PathBuf) -> Result<Self> {
96        let now = OffsetDateTime::now_utc();
97        let state = Self {
98            created_at: now,
99            updated_at: now,
100            last_block_height: None,
101            processes: Vec::new(),
102            path,
103        };
104        state.persist().await?;
105        Ok(state)
106    }
107
108    pub async fn touch(&mut self) -> Result<()> {
109        self.updated_at = OffsetDateTime::now_utc();
110        self.persist().await
111    }
112
113    async fn persist(&self) -> Result<()> {
114        ensure_parent(&self.path).await?;
115        let mut snapshot = self.clone();
116        for process in &mut snapshot.processes {
117            process.pid = process.current_pid();
118            process.pid_handle = None;
119            process.shutdown_handle = None;
120        }
121        let contents = serde_json::to_string_pretty(&snapshot)?;
122        write_atomic(&self.path, contents).await
123    }
124}
125
126pub async fn spawn_pid_sync_tasks(state: Arc<Mutex<State>>) {
127    let tracked = {
128        let state = state.lock().await;
129        state
130            .processes
131            .iter()
132            .filter_map(|process| {
133                process
134                    .pid_handle
135                    .as_ref()
136                    .map(|handle| (process.id.clone(), Arc::clone(handle)))
137            })
138            .collect::<Vec<_>>()
139    };
140
141    for (process_id, pid_handle) in tracked {
142        spawn_pid_sync_task(Arc::clone(&state), process_id, pid_handle);
143    }
144}
145
146pub async fn spawn_pid_sync_tasks_for_ids(state: Arc<Mutex<State>>, process_ids: &[String]) {
147    let tracked = {
148        let state = state.lock().await;
149        state
150            .processes
151            .iter()
152            .filter(|process| process_ids.iter().any(|id| id == &process.id))
153            .filter_map(|process| {
154                process
155                    .pid_handle
156                    .as_ref()
157                    .map(|handle| (process.id.clone(), Arc::clone(handle)))
158            })
159            .collect::<Vec<_>>()
160    };
161
162    for (process_id, pid_handle) in tracked {
163        spawn_pid_sync_task(Arc::clone(&state), process_id, pid_handle);
164    }
165}
166
167fn spawn_pid_sync_task(state: Arc<Mutex<State>>, process_id: String, pid_handle: Arc<AtomicU32>) {
168    tokio::spawn(async move {
169        let mut last_seen = Some(u32::MAX);
170        loop {
171            let current_pid = {
172                let pid = pid_handle.load(Ordering::SeqCst);
173                (pid != 0).then_some(pid)
174            };
175            let should_exit;
176
177            if current_pid != last_seen {
178                last_seen = current_pid;
179                let mut state = state.lock().await;
180                let Some(process) = state
181                    .processes
182                    .iter_mut()
183                    .find(|process| process.id == process_id)
184                else {
185                    return;
186                };
187
188                process.pid = current_pid;
189                should_exit =
190                    !matches!(process.last_status, ProcessStatus::Running) && current_pid.is_none();
191
192                if let Err(error) = state.touch().await {
193                    warn!(
194                        %error,
195                        process_id,
196                        "failed to persist updated process pid"
197                    );
198                    return;
199                }
200            } else {
201                let state = state.lock().await;
202                let Some(process) = state
203                    .processes
204                    .iter()
205                    .find(|process| process.id == process_id)
206                else {
207                    return;
208                };
209                should_exit =
210                    !matches!(process.last_status, ProcessStatus::Running) && current_pid.is_none();
211            }
212
213            if should_exit {
214                return;
215            }
216
217            sleep(Duration::from_millis(100)).await;
218        }
219    });
220}
221
222async fn ensure_parent(path: &Path) -> Result<()> {
223    if let Some(parent) = path.parent() {
224        tokio_fs::create_dir_all(parent).await?;
225    }
226    Ok(())
227}
228
229async fn write_atomic(path: &Path, contents: String) -> Result<()> {
230    let suffix = SystemTime::now()
231        .duration_since(UNIX_EPOCH)
232        .map(|duration| duration.as_nanos())
233        .unwrap_or(0);
234    let tmp_path = atomic_tmp_path(path, suffix);
235    tokio_fs::write(&tmp_path, contents).await?;
236    if let Err(err) = tokio_fs::rename(&tmp_path, path).await {
237        let _ = tokio_fs::remove_file(&tmp_path).await;
238        return Err(err.into());
239    }
240    Ok(())
241}
242
243fn atomic_tmp_path(path: &Path, suffix: u128) -> PathBuf {
244    let parent = path
245        .parent()
246        .filter(|parent| !parent.as_os_str().is_empty())
247        .unwrap_or_else(|| Path::new("."));
248    let base_name = path
249        .file_name()
250        .unwrap_or_else(|| OsStr::new(STATE_FILE_NAME))
251        .to_string_lossy();
252    parent.join(format!(
253        ".{}.{}.{}.tmp",
254        base_name,
255        std::process::id(),
256        suffix
257    ))
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use serde_json::Value;
264    use tempfile::TempDir;
265
266    fn test_record(pid: Option<u32>, pid_handle: Option<Arc<AtomicU32>>) -> ProcessRecord {
267        ProcessRecord {
268            id: "node-1".to_string(),
269            node_id: 1,
270            kind: ProcessKind::Node,
271            group: ProcessGroup::Validators1,
272            command: "/tmp/casper-node".to_string(),
273            args: vec!["validator".to_string()],
274            cwd: "/tmp/network".to_string(),
275            pid,
276            pid_handle,
277            shutdown_handle: None,
278            stdout_path: "/tmp/stdout.log".to_string(),
279            stderr_path: "/tmp/stderr.log".to_string(),
280            started_at: None,
281            stopped_at: None,
282            exit_code: None,
283            exit_signal: None,
284            last_status: ProcessStatus::Running,
285        }
286    }
287
288    async fn read_pid(path: &Path) -> Option<u64> {
289        let contents = tokio_fs::read_to_string(path).await.unwrap();
290        let value: Value = serde_json::from_str(&contents).unwrap();
291        value["processes"][0]["pid"].as_u64()
292    }
293
294    #[test]
295    fn atomic_tmp_path_uses_state_file_parent() {
296        let state_path = Path::new("/tmp/devnet/networks/casper-dev/state.json");
297        let tmp_path = atomic_tmp_path(state_path, 42);
298
299        assert_eq!(tmp_path.parent(), state_path.parent());
300        assert!(
301            tmp_path
302                .file_name()
303                .unwrap()
304                .to_string_lossy()
305                .starts_with(".state.json.")
306        );
307    }
308
309    #[tokio::test(flavor = "current_thread")]
310    async fn touch_persists_current_pid_from_handle() {
311        let temp_dir = TempDir::new().unwrap();
312        let state_path = temp_dir.path().join("state.json");
313        let pid_handle = Arc::new(AtomicU32::new(4242));
314
315        let mut state = State::new(state_path.clone()).await.unwrap();
316        state
317            .processes
318            .push(test_record(Some(1111), Some(Arc::clone(&pid_handle))));
319        state.touch().await.unwrap();
320
321        assert_eq!(read_pid(&state_path).await, Some(4242));
322    }
323
324    #[tokio::test(flavor = "current_thread")]
325    async fn pid_sync_task_updates_state_when_pid_changes() {
326        let temp_dir = TempDir::new().unwrap();
327        let state_path = temp_dir.path().join("state.json");
328        let pid_handle = Arc::new(AtomicU32::new(5001));
329
330        let mut state = State::new(state_path.clone()).await.unwrap();
331        state
332            .processes
333            .push(test_record(Some(5001), Some(Arc::clone(&pid_handle))));
334        state.touch().await.unwrap();
335
336        let state = Arc::new(Mutex::new(state));
337        spawn_pid_sync_tasks(Arc::clone(&state)).await;
338        pid_handle.store(6002, Ordering::SeqCst);
339
340        for _ in 0..50 {
341            if read_pid(&state_path).await == Some(6002) {
342                return;
343            }
344            sleep(Duration::from_millis(20)).await;
345        }
346
347        panic!("timed out waiting for pid sync task to persist updated pid");
348    }
349}