posemesh_node_registration/
register.rs

1use crate::crypto::{
2    format_timestamp_nanos, load_secp256k1_privhex, secp256k1_pubkey_uncompressed_hex,
3    sign_recoverable_keccak_hex,
4};
5use crate::state::{
6    read_state, set_status, touch_healthcheck_now, LockGuard, RegistrationState,
7    STATUS_DISCONNECTED, STATUS_REGISTERED, STATUS_REGISTERING,
8};
9use anyhow::{anyhow, Context, Result};
10use chrono::Utc;
11use rand::Rng;
12use reqwest::Client;
13use secp256k1::SecretKey;
14use serde::Serialize;
15use std::time::{Duration, Instant};
16use tracing::{debug, info, warn};
17
18#[derive(Debug, Serialize)]
19pub struct NodeRegistrationRequest {
20    pub url: String,
21    pub version: String,
22    pub registration_credentials: String,
23    pub signature: String,
24    pub timestamp: String,
25    pub public_key: String,
26    pub capabilities: Vec<String>,
27}
28
29fn registration_endpoint(dds_base_url: &str) -> String {
30    let base = dds_base_url.trim_end_matches('/');
31    format!("{}/internal/v1/nodes/register", base)
32}
33
34fn build_registration_request(
35    node_url: &str,
36    node_version: &str,
37    reg_secret: &str,
38    sk: &SecretKey,
39    capabilities: &[String],
40) -> NodeRegistrationRequest {
41    let ts = format_timestamp_nanos(Utc::now());
42    let msg = format!("{}{}", node_url, ts);
43    let signature = sign_recoverable_keccak_hex(sk, msg.as_bytes());
44    let public_key = secp256k1_pubkey_uncompressed_hex(sk);
45    let registration_credentials = reg_secret.to_owned();
46    NodeRegistrationRequest {
47        url: node_url.to_owned(),
48        version: node_version.to_owned(),
49        registration_credentials,
50        signature,
51        timestamp: ts,
52        public_key,
53        capabilities: capabilities.to_vec(),
54    }
55}
56
57pub async fn register_once(
58    dds_base_url: &str,
59    node_url: &str,
60    node_version: &str,
61    reg_secret: &str,
62    sk: &SecretKey,
63    client: &Client,
64    capabilities: &[String],
65) -> Result<()> {
66    let req = build_registration_request(node_url, node_version, reg_secret, sk, capabilities);
67    let endpoint = registration_endpoint(dds_base_url);
68
69    let pk_short = req.public_key.get(0..16).unwrap_or(&req.public_key);
70    info!(
71        url = req.url,
72        version = req.version,
73        public_key_prefix = pk_short,
74        capabilities = ?req.capabilities,
75        "Registering node with DDS"
76    );
77
78    let res = client
79        .post(&endpoint)
80        .json(&req)
81        .send()
82        .await
83        .with_context(|| format!("POST {} failed", endpoint))?;
84
85    if res.status().is_success() {
86        debug!(status = ?res.status(), "Registration ok");
87        Ok(())
88    } else {
89        let status = res.status();
90        let body_snippet = match res.text().await {
91            Ok(mut text) => {
92                if text.len() > 512 {
93                    text.truncate(512);
94                }
95                text.replace('\n', " ")
96            }
97            Err(_) => "<unavailable>".to_string(),
98        };
99        Err(anyhow!(
100            "registration failed: status {}, endpoint {}, body_snippet: {}",
101            status,
102            endpoint,
103            body_snippet
104        ))
105    }
106}
107
108#[derive(Debug)]
109pub struct RegistrationConfig {
110    pub dds_base_url: String,
111    pub node_url: String,
112    pub node_version: String,
113    pub reg_secret: String,
114    pub secp256k1_privhex: String,
115    pub client: Client,
116    pub register_interval_secs: u64,
117    pub max_retry: i32,
118    pub capabilities: Vec<String>,
119}
120
121pub async fn run_registration_loop(cfg: RegistrationConfig) {
122    let RegistrationConfig {
123        dds_base_url,
124        node_url,
125        node_version,
126        reg_secret,
127        secp256k1_privhex,
128        client,
129        register_interval_secs,
130        max_retry,
131        capabilities,
132    } = cfg;
133    let sk = match load_secp256k1_privhex(&secp256k1_privhex) {
134        Ok(k) => k,
135        Err(e) => {
136            warn!("Invalid secp256k1 private key (redacted): {}", e);
137            return;
138        }
139    };
140
141    let healthcheck_ttl = Duration::from_secs(register_interval_secs.max(1));
142    let lock_stale_after = {
143        let base = healthcheck_ttl.saturating_mul(2);
144        let min = Duration::from_secs(30);
145        let max = Duration::from_secs(600);
146        if base < min {
147            min
148        } else if base > max {
149            max
150        } else {
151            base
152        }
153    };
154
155    fn timer_interval_secs(attempt: i32) -> u64 {
156        if attempt <= 0 {
157            return 0;
158        }
159        let p = 2_i64.saturating_pow(attempt as u32);
160        p.clamp(0, 60) as u64
161    }
162
163    let _ = set_status(read_state().map(|s| s.status).unwrap_or_default().as_str());
164
165    let mut attempt: i32 = 0;
166    let mut next_sleep = Duration::from_secs(1);
167
168    info!(
169        event = "registration.loop.start",
170        healthcheck_ttl_sec = healthcheck_ttl.as_secs() as i64,
171        node_url = %node_url,
172        node_version = %node_version,
173        "registration loop started"
174    );
175
176    loop {
177        tokio::time::sleep(next_sleep).await;
178        let RegistrationState {
179            status,
180            last_healthcheck,
181        } = read_state().unwrap_or_default();
182
183        match status.as_str() {
184            STATUS_DISCONNECTED | STATUS_REGISTERING => {
185                let lock_guard = match LockGuard::try_acquire(lock_stale_after) {
186                    Ok(Some(g)) => {
187                        info!(event = "lock.acquired", "registration lock acquired");
188                        Some(g)
189                    }
190                    Ok(None) => {
191                        debug!(event = "lock.busy", "another registrar is active");
192                        next_sleep = healthcheck_ttl;
193                        continue;
194                    }
195                    Err(e) => {
196                        warn!(event = "lock.error", error = %e, "could not acquire lock");
197                        next_sleep = healthcheck_ttl;
198                        continue;
199                    }
200                };
201
202                if status.as_str() == STATUS_DISCONNECTED {
203                    if let Err(e) = set_status(STATUS_REGISTERING) {
204                        warn!(event = "status.transition.error", error = %e);
205                    } else {
206                        info!(
207                            event = "status.transition",
208                            from = STATUS_DISCONNECTED,
209                            to = STATUS_REGISTERING,
210                            "moved to registering"
211                        );
212                    }
213                }
214
215                attempt += 1;
216
217                let start = Instant::now();
218                let res = register_once(
219                    &dds_base_url,
220                    &node_url,
221                    &node_version,
222                    &reg_secret,
223                    &sk,
224                    &client,
225                    &capabilities,
226                )
227                .await;
228                let elapsed_ms = start.elapsed().as_millis();
229
230                match res {
231                    Ok(()) => {
232                        let _ = set_status(STATUS_REGISTERED);
233                        let _ = touch_healthcheck_now();
234                        info!(
235                            event = "registration.success",
236                            elapsed_ms = elapsed_ms as i64,
237                            "successfully registered to DDS"
238                        );
239                        attempt = 0;
240                        next_sleep = healthcheck_ttl;
241                        drop(lock_guard);
242                    }
243                    Err(e) => {
244                        warn!(
245                            event = "registration.error",
246                            elapsed_ms = elapsed_ms as i64,
247                            error = %e,
248                            error_debug = ?e,
249                            attempt = attempt,
250                            "registration to DDS failed; will back off"
251                        );
252                        if max_retry >= 0 && attempt >= max_retry {
253                            warn!(
254                                event = "registration.max_retry_reached",
255                                max_retry = max_retry,
256                                "max retry reached; pausing until next TTL window"
257                            );
258                            attempt = 0;
259                            next_sleep = healthcheck_ttl;
260                            drop(lock_guard);
261                            continue;
262                        }
263                        let base = Duration::from_secs(timer_interval_secs(attempt));
264                        let jitter_factor: f64 = rand::thread_rng().gen_range(0.8..=1.2);
265                        next_sleep =
266                            Duration::from_secs_f64(base.as_secs_f64() * jitter_factor.max(0.1));
267                        drop(lock_guard);
268                    }
269                }
270            }
271            STATUS_REGISTERED => {
272                let elapsed = last_healthcheck
273                    .map(|t| Utc::now() - t)
274                    .map(|d| d.to_std().unwrap_or_default())
275                    .unwrap_or_else(|| Duration::from_secs(u64::MAX / 2));
276
277                if elapsed > healthcheck_ttl {
278                    info!(
279                        event = "healthcheck.expired",
280                        elapsed_since_healthcheck_sec = elapsed.as_secs() as i64,
281                        "healthcheck TTL exceeded; re-entering registering"
282                    );
283                    let _ = set_status(STATUS_REGISTERING);
284                    next_sleep = Duration::from_secs(1);
285                } else {
286                    next_sleep = healthcheck_ttl;
287                }
288            }
289            other => {
290                warn!(
291                    event = "status.unknown",
292                    status = other,
293                    "unknown status; resetting to disconnected"
294                );
295                let _ = set_status(STATUS_DISCONNECTED);
296                next_sleep = Duration::from_secs(1);
297            }
298        }
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::crypto::load_secp256k1_privhex;
306    use parking_lot::Mutex as PLMutex;
307    use std::io;
308    use std::sync::Arc;
309    use tracing::subscriber;
310    use tracing_subscriber::layer::SubscriberExt;
311
312    struct BufWriter(Arc<PLMutex<Vec<u8>>>);
313    impl io::Write for BufWriter {
314        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
315            self.0.lock().extend_from_slice(buf);
316            Ok(buf.len())
317        }
318        fn flush(&mut self) -> io::Result<()> {
319            Ok(())
320        }
321    }
322    struct MakeBufWriter(Arc<PLMutex<Vec<u8>>>);
323    impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for MakeBufWriter {
324        type Writer = BufWriter;
325        fn make_writer(&'a self) -> Self::Writer {
326            BufWriter(self.0.clone())
327        }
328    }
329
330    #[tokio::test]
331    async fn logs_do_not_include_secret() {
332        let buf = Arc::new(PLMutex::new(Vec::<u8>::new()));
333        let make = MakeBufWriter(buf.clone());
334        let layer = tracing_subscriber::fmt::layer()
335            .with_writer(make)
336            .with_ansi(false)
337            .without_time();
338        let subscriber = tracing_subscriber::registry().with(layer);
339        let _guard = subscriber::set_default(subscriber);
340
341        let secret = "my-super-secret";
342        let dds = "http://127.0.0.1:9";
343        let url = "https://node.example.com";
344        let version = "1.2.3";
345        let sk = load_secp256k1_privhex(
346            "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace",
347        )
348        .unwrap();
349        let client = reqwest::Client::builder()
350            .no_proxy()
351            .timeout(Duration::from_millis(200))
352            .build()
353            .unwrap();
354        let capabilities = vec![
355            "/reconstruction/global-refinement/v1".to_string(),
356            "/reconstruction/local-refinement/v1".to_string(),
357        ];
358
359        let _ = register_once(dds, url, version, secret, &sk, &client, &capabilities).await;
360
361        let captured = String::from_utf8(buf.lock().clone()).unwrap_or_default();
362        assert!(captured.contains("Registering node with DDS"));
363        assert!(
364            !captured.contains(secret),
365            "logs leaked secret: {}",
366            captured
367        );
368    }
369}