posemesh_node_registration/
state.rs

1use anyhow::{anyhow, Result};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::sync::{Mutex, MutexGuard, OnceLock};
8use std::time::{Duration, Instant};
9
10pub const STATE_PATH: &str = "data/registration_state.json";
11pub const LOCK_PATH: &str = "data/registration.lock";
12
13pub const STATUS_REGISTERING: &str = "registering";
14pub const STATUS_REGISTERED: &str = "registered";
15pub const STATUS_DISCONNECTED: &str = "disconnected";
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct RegistrationState {
19    pub status: String,
20    #[serde(default, with = "chrono::serde::ts_seconds_option")]
21    pub last_healthcheck: Option<DateTime<Utc>>,
22}
23
24impl Default for RegistrationState {
25    fn default() -> Self {
26        Self {
27            status: STATUS_DISCONNECTED.to_string(),
28            last_healthcheck: None,
29        }
30    }
31}
32
33static STATE_STORE: OnceLock<Mutex<HashMap<PathBuf, RegistrationState>>> = OnceLock::new();
34
35fn state_store() -> &'static Mutex<HashMap<PathBuf, RegistrationState>> {
36    STATE_STORE.get_or_init(|| Mutex::new(HashMap::new()))
37}
38
39fn lock_state_store() -> Result<MutexGuard<'static, HashMap<PathBuf, RegistrationState>>> {
40    state_store()
41        .lock()
42        .map_err(|_| anyhow!("registration state store poisoned"))
43}
44
45pub fn read_state_from_path(path: &Path) -> Result<RegistrationState> {
46    let store = lock_state_store()?;
47    Ok(store.get(path).cloned().unwrap_or_default())
48}
49
50pub fn write_state_to_path(path: &Path, st: &RegistrationState) -> Result<()> {
51    let mut store = lock_state_store()?;
52    store.insert(path.to_path_buf(), st.clone());
53    Ok(())
54}
55
56pub fn read_state() -> Result<RegistrationState> {
57    read_state_from_path(Path::new(STATE_PATH))
58}
59
60pub fn write_state(st: &RegistrationState) -> Result<()> {
61    write_state_to_path(Path::new(STATE_PATH), st)
62}
63
64pub fn set_status(new_status: &str) -> Result<()> {
65    let mut st = read_state()?;
66    st.status = new_status.to_string();
67    write_state(&st)
68}
69
70pub fn touch_healthcheck_now() -> Result<()> {
71    let mut st = read_state()?;
72    st.last_healthcheck = Some(Utc::now());
73    write_state(&st)
74}
75
76pub struct LockGuard {
77    path: PathBuf,
78}
79
80impl LockGuard {
81    pub fn try_acquire(stale_after: Duration) -> std::io::Result<Option<Self>> {
82        let path = PathBuf::from(LOCK_PATH);
83        let mut locks = lock_lock_store()?;
84
85        let entry = locks.entry(path.clone()).or_default();
86        let now = Instant::now();
87
88        if let Some(acquired_at) = entry.acquired_at {
89            if now.duration_since(acquired_at) <= stale_after {
90                return Ok(None);
91            }
92        }
93
94        entry.acquired_at = Some(now);
95        entry.owner_pid = Some(std::process::id());
96
97        Ok(Some(Self { path }))
98    }
99}
100
101impl Drop for LockGuard {
102    fn drop(&mut self) {
103        if let Ok(mut locks) = lock_lock_store() {
104            if let Some(entry) = locks.get_mut(&self.path) {
105                entry.acquired_at = None;
106                entry.owner_pid = None;
107            }
108        }
109    }
110}
111
112#[derive(Default)]
113struct LockState {
114    acquired_at: Option<Instant>,
115    owner_pid: Option<u32>,
116}
117
118static LOCK_STORE: OnceLock<Mutex<HashMap<PathBuf, LockState>>> = OnceLock::new();
119
120fn lock_store() -> &'static Mutex<HashMap<PathBuf, LockState>> {
121    LOCK_STORE.get_or_init(|| Mutex::new(HashMap::new()))
122}
123
124fn lock_lock_store() -> io::Result<MutexGuard<'static, HashMap<PathBuf, LockState>>> {
125    lock_store()
126        .lock()
127        .map_err(|_| io::Error::other("registration lock store poisoned"))
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133
134    #[test]
135    fn default_state_is_disconnected() {
136        let st = RegistrationState::default();
137        assert_eq!(st.status, STATUS_DISCONNECTED);
138        assert!(st.last_healthcheck.is_none());
139    }
140}