posemesh_node_registration/
state.rs

1use anyhow::{anyhow, Result};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::{Mutex, MutexGuard, OnceLock};
8use std::time::{Duration, Instant};
9
10pub const STATE_PATH: &str = "data/registration_state.json";
11
12pub const STATUS_REGISTERING: &str = "registering";
13pub const STATUS_REGISTERED: &str = "registered";
14pub const STATUS_DISCONNECTED: &str = "disconnected";
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RegistrationState {
18    pub status: String,
19    #[serde(default, with = "chrono::serde::ts_seconds_option")]
20    pub last_healthcheck: Option<DateTime<Utc>>,
21}
22
23impl Default for RegistrationState {
24    fn default() -> Self {
25        Self {
26            status: STATUS_DISCONNECTED.to_string(),
27            last_healthcheck: None,
28        }
29    }
30}
31
32static STATE_STORE: OnceLock<Mutex<HashMap<PathBuf, RegistrationState>>> = OnceLock::new();
33
34fn state_store() -> &'static Mutex<HashMap<PathBuf, RegistrationState>> {
35    STATE_STORE.get_or_init(|| Mutex::new(HashMap::new()))
36}
37
38fn lock_state_store() -> Result<MutexGuard<'static, HashMap<PathBuf, RegistrationState>>> {
39    state_store()
40        .lock()
41        .map_err(|_| anyhow!("registration state store poisoned"))
42}
43
44pub fn read_state_from_path(path: &Path) -> Result<RegistrationState> {
45    let store = lock_state_store()?;
46    Ok(store.get(path).cloned().unwrap_or_default())
47}
48
49pub fn write_state_to_path(path: &Path, st: &RegistrationState) -> Result<()> {
50    let mut store = lock_state_store()?;
51    store.insert(path.to_path_buf(), st.clone());
52    Ok(())
53}
54
55pub fn read_state() -> Result<RegistrationState> {
56    read_state_from_path(Path::new(STATE_PATH))
57}
58
59pub fn write_state(st: &RegistrationState) -> Result<()> {
60    write_state_to_path(Path::new(STATE_PATH), st)
61}
62
63pub fn set_status(new_status: &str) -> Result<()> {
64    let mut st = read_state()?;
65    st.status = new_status.to_string();
66    write_state(&st)
67}
68
69pub fn touch_healthcheck_now() -> Result<()> {
70    let mut st = read_state()?;
71    st.last_healthcheck = Some(Utc::now());
72    write_state(&st)
73}
74
75pub struct LockGuard;
76
77impl LockGuard {
78    pub fn try_acquire(stale_after: Duration) -> std::io::Result<Option<Self>> {
79        let mut state = lock_lock_store()?;
80        let now = Instant::now();
81
82        if let Some(acquired_at) = state.acquired_at {
83            if now.duration_since(acquired_at) <= stale_after {
84                return Ok(None);
85            }
86        }
87
88        state.acquired_at = Some(now);
89        state.owner_pid = Some(std::process::id());
90
91        Ok(Some(Self))
92    }
93}
94
95impl Drop for LockGuard {
96    fn drop(&mut self) {
97        if let Ok(mut state) = lock_lock_store() {
98            state.acquired_at = None;
99            state.owner_pid = None;
100        }
101    }
102}
103
104#[derive(Default)]
105struct LockState {
106    acquired_at: Option<Instant>,
107    owner_pid: Option<u32>,
108}
109
110static LOCK_STATE: OnceLock<Mutex<LockState>> = OnceLock::new();
111
112fn lock_store() -> &'static Mutex<LockState> {
113    LOCK_STATE.get_or_init(|| Mutex::new(LockState::default()))
114}
115
116fn lock_lock_store() -> io::Result<MutexGuard<'static, LockState>> {
117    lock_store()
118        .lock()
119        .map_err(|_| io::Error::other("registration lock store poisoned"))
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125
126    #[test]
127    fn default_state_is_disconnected() {
128        let st = RegistrationState::default();
129        assert_eq!(st.status, STATUS_DISCONNECTED);
130        assert!(st.last_healthcheck.is_none());
131    }
132}