posemesh_node_registration/
state.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::fs::{self, File};
5use std::io::Write;
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8
9pub const STATE_PATH: &str = "data/registration_state.json";
10pub const LOCK_PATH: &str = "data/registration.lock";
11
12pub const STATUS_REGISTERING: &str = "registering";
13pub const STATUS_REGISTERED: &str = "registered";
14pub const STATUS_DISCONNECTED: &str = "disconnected";
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RegistrationState {
18    pub status: String,
19    #[serde(default, with = "chrono::serde::ts_seconds_option")]
20    pub last_healthcheck: Option<DateTime<Utc>>,
21}
22
23impl Default for RegistrationState {
24    fn default() -> Self {
25        Self {
26            status: STATUS_DISCONNECTED.to_string(),
27            last_healthcheck: None,
28        }
29    }
30}
31
32fn tmp_path_for(path: &Path) -> PathBuf {
33    let mut p = PathBuf::from(path);
34    let base = p
35        .file_name()
36        .and_then(|n| n.to_str())
37        .unwrap_or("registration_state.json");
38    let pid = std::process::id();
39    let nonce = uuid::Uuid::new_v4();
40    let tmp = format!("{}.tmp.{}.{}", base, pid, nonce);
41    p.set_file_name(tmp);
42    p
43}
44
45pub fn read_state_from_path(path: &Path) -> Result<RegistrationState> {
46    match fs::read_to_string(path) {
47        Ok(s) => {
48            let st: RegistrationState =
49                serde_json::from_str(&s).with_context(|| format!("decode {}", path.display()))?;
50            Ok(st)
51        }
52        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(RegistrationState::default()),
53        Err(e) => Err(e).with_context(|| format!("read {}", path.display())),
54    }
55}
56
57pub fn write_state_to_path(path: &Path, st: &RegistrationState) -> Result<()> {
58    if let Some(parent) = path.parent() {
59        fs::create_dir_all(parent).with_context(|| format!("create dir {}", parent.display()))?;
60    }
61    let tmp = tmp_path_for(path);
62    let encoded = serde_json::to_vec_pretty(st).context("encode state json")?;
63    let mut f = File::create(&tmp).with_context(|| format!("create tmp {}", tmp.display()))?;
64    f.write_all(&encoded)
65        .with_context(|| format!("write tmp {}", tmp.display()))?;
66    f.sync_all().ok();
67    drop(f);
68    match fs::rename(&tmp, path) {
69        Ok(()) => {}
70        Err(_e) => {
71            let _ = fs::remove_file(path);
72            fs::rename(&tmp, path)
73                .with_context(|| format!("rename {} -> {}", tmp.display(), path.display()))?;
74        }
75    }
76    if let Some(parent) = path.parent() {
77        if let Ok(dir) = File::open(parent) {
78            let _ = dir.sync_all();
79        }
80    }
81    Ok(())
82}
83
84pub fn read_state() -> Result<RegistrationState> {
85    read_state_from_path(Path::new(STATE_PATH))
86}
87
88pub fn write_state(st: &RegistrationState) -> Result<()> {
89    write_state_to_path(Path::new(STATE_PATH), st)
90}
91
92pub fn set_status(new_status: &str) -> Result<()> {
93    let mut st = read_state()?;
94    st.status = new_status.to_string();
95    write_state(&st)
96}
97
98pub fn touch_healthcheck_now() -> Result<()> {
99    let mut st = read_state()?;
100    st.last_healthcheck = Some(Utc::now());
101    write_state(&st)
102}
103
104pub struct LockGuard {
105    path: std::path::PathBuf,
106    _file: fs::File,
107}
108
109impl LockGuard {
110    pub fn try_acquire(stale_after: Duration) -> std::io::Result<Option<Self>> {
111        let path = Path::new(LOCK_PATH);
112        if let Some(parent) = path.parent() {
113            let _ = fs::create_dir_all(parent);
114        }
115
116        match fs::OpenOptions::new()
117            .write(true)
118            .create_new(true)
119            .open(path)
120        {
121            Ok(mut f) => {
122                let now = chrono::Utc::now().to_rfc3339();
123                let _ = writeln!(f, "created_at={}, pid={}", now, std::process::id());
124                Ok(Some(Self {
125                    path: path.to_path_buf(),
126                    _file: f,
127                }))
128            }
129            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
130                if let Ok(meta) = fs::metadata(path) {
131                    if let Ok(modified) = meta.modified() {
132                        if let Ok(age) = modified.elapsed() {
133                            if age > stale_after {
134                                let _ = fs::remove_file(path);
135                                if let Ok(mut f2) = fs::OpenOptions::new()
136                                    .write(true)
137                                    .create_new(true)
138                                    .open(path)
139                                {
140                                    let now = chrono::Utc::now().to_rfc3339();
141                                    let _ = writeln!(
142                                        f2,
143                                        "created_at={}, pid={}",
144                                        now,
145                                        std::process::id()
146                                    );
147                                    return Ok(Some(Self {
148                                        path: path.to_path_buf(),
149                                        _file: f2,
150                                    }));
151                                }
152                            }
153                        }
154                    }
155                }
156                Ok(None)
157            }
158            Err(e) => Err(e),
159        }
160    }
161}
162
163impl Drop for LockGuard {
164    fn drop(&mut self) {
165        let _ = fs::remove_file(&self.path);
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[test]
174    fn default_state_is_disconnected() {
175        let st = RegistrationState::default();
176        assert_eq!(st.status, STATUS_DISCONNECTED);
177        assert!(st.last_healthcheck.is_none());
178    }
179}