posemesh_node_registration/
register.rs

1use crate::crypto::{derive_eth_address, load_secp256k1_privhex, sign_eip191_recoverable_hex};
2use crate::state::{
3    read_state, set_status, LockGuard, RegistrationState, STATUS_DISCONNECTED, STATUS_REGISTERED,
4    STATUS_REGISTERING,
5};
6use anyhow::{anyhow, Context, Result};
7use rand::Rng;
8use reqwest::Client;
9use secp256k1::SecretKey;
10use serde::{Deserialize, Serialize};
11use std::time::{Duration, Instant};
12use tracing::{debug, info, warn};
13
14#[derive(Debug, Serialize)]
15pub struct NodeRegisterWalletRequest {
16    pub message: String,
17    pub signature: String,
18    pub registration_credentials: String,
19    pub capabilities: Vec<String>,
20    pub version: String,
21}
22
23#[derive(Debug, Deserialize)]
24struct SiweRequestMeta {
25    pub nonce: Option<String>,
26    pub domain: Option<String>,
27    pub uri: Option<String>,
28    pub version: Option<String>,
29    #[serde(rename = "chainId")]
30    pub chain_id: Option<i64>,
31    #[serde(rename = "issuedAt")]
32    pub issued_at: Option<String>,
33}
34
35fn registration_endpoint(dds_base_url: &str) -> String {
36    let base = dds_base_url.trim_end_matches('/');
37    format!("{}/internal/v1/nodes/register-wallet", base)
38}
39
40fn siwe_request_endpoint(dds_base_url: &str) -> String {
41    let base = dds_base_url.trim_end_matches('/');
42    format!("{}/internal/v1/auth/siwe/request", base)
43}
44
45async fn request_siwe_meta(
46    dds_base_url: &str,
47    wallet: &str,
48    client: &Client,
49) -> Result<SiweRequestMeta> {
50    let endpoint = siwe_request_endpoint(dds_base_url);
51    let res = client
52        .post(&endpoint)
53        .json(&serde_json::json!({ "wallet": wallet }))
54        .send()
55        .await
56        .with_context(|| format!("POST {} failed", endpoint))?
57        .error_for_status()
58        .with_context(|| format!("POST {} returned error", endpoint))?;
59    let body: SiweRequestMeta = res.json().await?;
60    if body.nonce.as_deref().unwrap_or("").is_empty() {
61        return Err(anyhow!("siwe nonce missing in response"));
62    }
63    Ok(body)
64}
65
66fn compose_message(meta: &SiweRequestMeta, address: &str) -> Result<String> {
67    let domain = meta
68        .domain
69        .as_deref()
70        .ok_or_else(|| anyhow!("siwe domain missing"))?;
71    let uri = meta
72        .uri
73        .as_deref()
74        .ok_or_else(|| anyhow!("siwe uri missing"))?;
75    let version = meta
76        .version
77        .as_deref()
78        .ok_or_else(|| anyhow!("siwe version missing"))?;
79    let chain_id = meta.chain_id.ok_or_else(|| anyhow!("siwe chain id missing"))?;
80    let nonce = meta
81        .nonce
82        .as_deref()
83        .ok_or_else(|| anyhow!("siwe nonce missing"))?;
84    let issued_at = meta
85        .issued_at
86        .as_deref()
87        .ok_or_else(|| anyhow!("siwe issued_at missing"))?;
88
89    let mut out = String::new();
90    out.push_str(&format!(
91        "{} wants you to sign in with your Ethereum account:\n",
92        domain
93    ));
94    out.push_str(address);
95    out.push_str("\n\n");
96    out.push_str(&format!("URI: {}\n", uri));
97    out.push_str(&format!("Version: {}\n", version));
98    out.push_str(&format!("Chain ID: {}\n", chain_id));
99    out.push_str(&format!("Nonce: {}\n", nonce));
100    out.push_str(&format!("Issued At: {}", issued_at));
101    Ok(out)
102}
103
104pub async fn register_once(
105    dds_base_url: &str,
106    node_version: &str,
107    reg_secret: &str,
108    sk: &SecretKey,
109    client: &Client,
110    capabilities: &[String],
111) -> Result<()> {
112    if capabilities.is_empty() {
113        return Err(anyhow!(
114            "capabilities must be non-empty for DDS registration"
115        ));
116    }
117    let wallet = derive_eth_address(sk);
118    let wallet_prefix = wallet.get(0..10).unwrap_or(&wallet);
119    info!(
120        wallet_prefix = wallet_prefix,
121        version = node_version,
122        capabilities = ?capabilities,
123        "Registering node with DDS (SIWE)"
124    );
125
126    let meta = request_siwe_meta(dds_base_url, &wallet, client).await?;
127    let message = compose_message(&meta, &wallet)?;
128    let signature = sign_eip191_recoverable_hex(sk, &message);
129    let req = NodeRegisterWalletRequest {
130        message,
131        signature,
132        registration_credentials: reg_secret.to_owned(),
133        capabilities: capabilities.to_vec(),
134        version: node_version.to_owned(),
135    };
136    let endpoint = registration_endpoint(dds_base_url);
137
138    let res = client
139        .post(&endpoint)
140        .json(&req)
141        .send()
142        .await
143        .with_context(|| format!("POST {} failed", endpoint))?;
144
145    if res.status().is_success() {
146        debug!(status = ?res.status(), "Registration ok");
147        Ok(())
148    } else {
149        let status = res.status();
150        let body_snippet = match res.text().await {
151            Ok(mut text) => {
152                if text.len() > 512 {
153                    text.truncate(512);
154                }
155                text.replace('\n', " ")
156            }
157            Err(_) => "<unavailable>".to_string(),
158        };
159        Err(anyhow!(
160            "registration failed: status {}, endpoint {}, body_snippet: {}",
161            status,
162            endpoint,
163            body_snippet
164        ))
165    }
166}
167
168#[derive(Debug)]
169pub struct RegistrationConfig {
170    pub dds_base_url: String,
171    pub node_version: String,
172    pub reg_secret: String,
173    pub secp256k1_privhex: String,
174    pub client: Client,
175    pub register_interval_secs: u64,
176    pub max_retry: i32,
177    pub capabilities: Vec<String>,
178}
179
180pub async fn run_registration_loop(cfg: RegistrationConfig) {
181    let RegistrationConfig {
182        dds_base_url,
183        node_version,
184        reg_secret,
185        secp256k1_privhex,
186        client,
187        register_interval_secs,
188        max_retry,
189        capabilities,
190    } = cfg;
191    let sk = match load_secp256k1_privhex(&secp256k1_privhex) {
192        Ok(k) => k,
193        Err(e) => {
194            warn!("Invalid secp256k1 private key (redacted): {}", e);
195            return;
196        }
197    };
198
199    let register_interval = Duration::from_secs(register_interval_secs.max(1));
200    let lock_stale_after = {
201        let base = register_interval.saturating_mul(2);
202        let min = Duration::from_secs(30);
203        let max = Duration::from_secs(600);
204        if base < min {
205            min
206        } else if base > max {
207            max
208        } else {
209            base
210        }
211    };
212
213    fn timer_interval_secs(attempt: i32) -> u64 {
214        if attempt <= 0 {
215            return 0;
216        }
217        let p = 2_i64.saturating_pow(attempt as u32);
218        p.clamp(0, 60) as u64
219    }
220
221    let _ = set_status(read_state().map(|s| s.status).unwrap_or_default().as_str());
222
223    let mut attempt: i32 = 0;
224    let mut next_sleep = Duration::from_secs(1);
225
226    info!(
227        event = "registration.loop.start",
228        register_interval_sec = register_interval.as_secs() as i64,
229        node_version = %node_version,
230        "registration loop started"
231    );
232
233    loop {
234        tokio::time::sleep(next_sleep).await;
235        let RegistrationState { status, .. } = read_state().unwrap_or_default();
236
237        match status.as_str() {
238            STATUS_DISCONNECTED | STATUS_REGISTERING | STATUS_REGISTERED => {
239                let lock_guard = match LockGuard::try_acquire(lock_stale_after) {
240                    Ok(Some(g)) => {
241                        info!(event = "lock.acquired", "registration lock acquired");
242                        Some(g)
243                    }
244                    Ok(None) => {
245                        debug!(event = "lock.busy", "another registrar is active");
246                        next_sleep = register_interval;
247                        continue;
248                    }
249                    Err(e) => {
250                        warn!(event = "lock.error", error = %e, "could not acquire lock");
251                        next_sleep = register_interval;
252                        continue;
253                    }
254                };
255
256                if status.as_str() == STATUS_DISCONNECTED {
257                    if let Err(e) = set_status(STATUS_REGISTERING) {
258                        warn!(event = "status.transition.error", error = %e);
259                    } else {
260                        info!(
261                            event = "status.transition",
262                            from = STATUS_DISCONNECTED,
263                            to = STATUS_REGISTERING,
264                            "moved to registering"
265                        );
266                    }
267                } else if status.as_str() == STATUS_REGISTERED {
268                    if let Err(e) = set_status(STATUS_REGISTERING) {
269                        warn!(event = "status.transition.error", error = %e);
270                    } else {
271                        info!(
272                            event = "status.transition",
273                            from = STATUS_REGISTERED,
274                            to = STATUS_REGISTERING,
275                            "refreshing registration"
276                        );
277                    }
278                }
279
280                attempt += 1;
281
282                let start = Instant::now();
283                let res = register_once(
284                    &dds_base_url,
285                    &node_version,
286                    &reg_secret,
287                    &sk,
288                    &client,
289                    &capabilities,
290                )
291                .await;
292                let elapsed_ms = start.elapsed().as_millis();
293
294                match res {
295                    Ok(()) => {
296                        let _ = set_status(STATUS_REGISTERED);
297                        info!(
298                            event = "registration.success",
299                            elapsed_ms = elapsed_ms as i64,
300                            "successfully registered to DDS"
301                        );
302                        attempt = 0;
303                        next_sleep = register_interval;
304                        drop(lock_guard);
305                    }
306                    Err(e) => {
307                        warn!(
308                            event = "registration.error",
309                            elapsed_ms = elapsed_ms as i64,
310                            error = %e,
311                            error_debug = ?e,
312                            attempt = attempt,
313                            "registration to DDS failed; will back off"
314                        );
315                        if max_retry >= 0 && attempt >= max_retry {
316                            warn!(
317                                event = "registration.max_retry_reached",
318                                max_retry = max_retry,
319                                "max retry reached; pausing until next TTL window"
320                            );
321                            attempt = 0;
322                            next_sleep = register_interval;
323                            drop(lock_guard);
324                            continue;
325                        }
326                        let base = Duration::from_secs(timer_interval_secs(attempt));
327                        let jitter_factor: f64 = rand::thread_rng().gen_range(0.8..=1.2);
328                        next_sleep =
329                            Duration::from_secs_f64(base.as_secs_f64() * jitter_factor.max(0.1));
330                        drop(lock_guard);
331                    }
332                }
333            }
334            other => {
335                warn!(
336                    event = "status.unknown",
337                    status = other,
338                    "unknown status; resetting to disconnected"
339                );
340                let _ = set_status(STATUS_DISCONNECTED);
341                next_sleep = Duration::from_secs(1);
342            }
343        }
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use crate::crypto::load_secp256k1_privhex;
351    use parking_lot::Mutex as PLMutex;
352    use std::io;
353    use std::sync::Arc;
354    use tracing::subscriber;
355    use tracing_subscriber::layer::SubscriberExt;
356
357    struct BufWriter(Arc<PLMutex<Vec<u8>>>);
358    impl io::Write for BufWriter {
359        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
360            self.0.lock().extend_from_slice(buf);
361            Ok(buf.len())
362        }
363        fn flush(&mut self) -> io::Result<()> {
364            Ok(())
365        }
366    }
367    struct MakeBufWriter(Arc<PLMutex<Vec<u8>>>);
368    impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for MakeBufWriter {
369        type Writer = BufWriter;
370        fn make_writer(&'a self) -> Self::Writer {
371            BufWriter(self.0.clone())
372        }
373    }
374
375    #[tokio::test]
376    async fn logs_do_not_include_secret() {
377        let buf = Arc::new(PLMutex::new(Vec::<u8>::new()));
378        let make = MakeBufWriter(buf.clone());
379        let layer = tracing_subscriber::fmt::layer()
380            .with_writer(make)
381            .with_ansi(false)
382            .without_time();
383        let subscriber = tracing_subscriber::registry().with(layer);
384        let _guard = subscriber::set_default(subscriber);
385
386        let secret = "my-super-secret";
387        let dds = "http://127.0.0.1:9";
388        let version = "1.2.3";
389        let sk = load_secp256k1_privhex(
390            "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace",
391        )
392        .unwrap();
393        let client = reqwest::Client::builder()
394            .no_proxy()
395            .timeout(Duration::from_millis(200))
396            .build()
397            .unwrap();
398        let capabilities = vec![
399            "/reconstruction/global-refinement/v1".to_string(),
400            "/reconstruction/local-refinement/v1".to_string(),
401        ];
402
403        let _ = register_once(dds, version, secret, &sk, &client, &capabilities).await;
404
405        let captured = String::from_utf8(buf.lock().clone()).unwrap_or_default();
406        assert!(captured.contains("Registering node with DDS"));
407        assert!(
408            !captured.contains(secret),
409            "logs leaked secret: {}",
410            captured
411        );
412    }
413}