posemesh_node_registration/
register.rs1use crate::crypto::{derive_eth_address, load_secp256k1_privhex, sign_eip191_recoverable_hex};
2use crate::state::{
3 read_state, set_status, LockGuard, RegistrationState, STATUS_DISCONNECTED, STATUS_REGISTERED,
4 STATUS_REGISTERING,
5};
6use anyhow::{anyhow, Context, Result};
7use rand::Rng;
8use reqwest::Client;
9use secp256k1::SecretKey;
10use serde::{Deserialize, Serialize};
11use std::time::{Duration, Instant};
12use tracing::{debug, info, warn};
13
14#[derive(Debug, Serialize)]
15pub struct NodeRegisterWalletRequest {
16 pub message: String,
17 pub signature: String,
18 pub registration_credentials: String,
19 pub capabilities: Vec<String>,
20 pub version: String,
21}
22
23#[derive(Debug, Deserialize)]
24struct SiweRequestMeta {
25 pub nonce: Option<String>,
26 pub domain: Option<String>,
27 pub uri: Option<String>,
28 pub version: Option<String>,
29 #[serde(rename = "chainId")]
30 pub chain_id: Option<i64>,
31 #[serde(rename = "issuedAt")]
32 pub issued_at: Option<String>,
33}
34
35fn registration_endpoint(dds_base_url: &str) -> String {
36 let base = dds_base_url.trim_end_matches('/');
37 format!("{}/internal/v1/nodes/register-wallet", base)
38}
39
40fn siwe_request_endpoint(dds_base_url: &str) -> String {
41 let base = dds_base_url.trim_end_matches('/');
42 format!("{}/internal/v1/auth/siwe/request", base)
43}
44
45async fn request_siwe_meta(
46 dds_base_url: &str,
47 wallet: &str,
48 client: &Client,
49) -> Result<SiweRequestMeta> {
50 let endpoint = siwe_request_endpoint(dds_base_url);
51 let res = client
52 .post(&endpoint)
53 .json(&serde_json::json!({ "wallet": wallet }))
54 .send()
55 .await
56 .with_context(|| format!("POST {} failed", endpoint))?
57 .error_for_status()
58 .with_context(|| format!("POST {} returned error", endpoint))?;
59 let body: SiweRequestMeta = res.json().await?;
60 if body.nonce.as_deref().unwrap_or("").is_empty() {
61 return Err(anyhow!("siwe nonce missing in response"));
62 }
63 Ok(body)
64}
65
66fn compose_message(meta: &SiweRequestMeta, address: &str) -> Result<String> {
67 let domain = meta
68 .domain
69 .as_deref()
70 .ok_or_else(|| anyhow!("siwe domain missing"))?;
71 let uri = meta
72 .uri
73 .as_deref()
74 .ok_or_else(|| anyhow!("siwe uri missing"))?;
75 let version = meta
76 .version
77 .as_deref()
78 .ok_or_else(|| anyhow!("siwe version missing"))?;
79 let chain_id = meta.chain_id.ok_or_else(|| anyhow!("siwe chain id missing"))?;
80 let nonce = meta
81 .nonce
82 .as_deref()
83 .ok_or_else(|| anyhow!("siwe nonce missing"))?;
84 let issued_at = meta
85 .issued_at
86 .as_deref()
87 .ok_or_else(|| anyhow!("siwe issued_at missing"))?;
88
89 let mut out = String::new();
90 out.push_str(&format!(
91 "{} wants you to sign in with your Ethereum account:\n",
92 domain
93 ));
94 out.push_str(address);
95 out.push_str("\n\n");
96 out.push_str(&format!("URI: {}\n", uri));
97 out.push_str(&format!("Version: {}\n", version));
98 out.push_str(&format!("Chain ID: {}\n", chain_id));
99 out.push_str(&format!("Nonce: {}\n", nonce));
100 out.push_str(&format!("Issued At: {}", issued_at));
101 Ok(out)
102}
103
104pub async fn register_once(
105 dds_base_url: &str,
106 node_version: &str,
107 reg_secret: &str,
108 sk: &SecretKey,
109 client: &Client,
110 capabilities: &[String],
111) -> Result<()> {
112 if capabilities.is_empty() {
113 return Err(anyhow!(
114 "capabilities must be non-empty for DDS registration"
115 ));
116 }
117 let wallet = derive_eth_address(sk);
118 let wallet_prefix = wallet.get(0..10).unwrap_or(&wallet);
119 info!(
120 wallet_prefix = wallet_prefix,
121 version = node_version,
122 capabilities = ?capabilities,
123 "Registering node with DDS (SIWE)"
124 );
125
126 let meta = request_siwe_meta(dds_base_url, &wallet, client).await?;
127 let message = compose_message(&meta, &wallet)?;
128 let signature = sign_eip191_recoverable_hex(sk, &message);
129 let req = NodeRegisterWalletRequest {
130 message,
131 signature,
132 registration_credentials: reg_secret.to_owned(),
133 capabilities: capabilities.to_vec(),
134 version: node_version.to_owned(),
135 };
136 let endpoint = registration_endpoint(dds_base_url);
137
138 let res = client
139 .post(&endpoint)
140 .json(&req)
141 .send()
142 .await
143 .with_context(|| format!("POST {} failed", endpoint))?;
144
145 if res.status().is_success() {
146 debug!(status = ?res.status(), "Registration ok");
147 Ok(())
148 } else {
149 let status = res.status();
150 let body_snippet = match res.text().await {
151 Ok(mut text) => {
152 if text.len() > 512 {
153 text.truncate(512);
154 }
155 text.replace('\n', " ")
156 }
157 Err(_) => "<unavailable>".to_string(),
158 };
159 Err(anyhow!(
160 "registration failed: status {}, endpoint {}, body_snippet: {}",
161 status,
162 endpoint,
163 body_snippet
164 ))
165 }
166}
167
168#[derive(Debug)]
169pub struct RegistrationConfig {
170 pub dds_base_url: String,
171 pub node_version: String,
172 pub reg_secret: String,
173 pub secp256k1_privhex: String,
174 pub client: Client,
175 pub register_interval_secs: u64,
176 pub max_retry: i32,
177 pub capabilities: Vec<String>,
178}
179
180pub async fn run_registration_loop(cfg: RegistrationConfig) {
181 let RegistrationConfig {
182 dds_base_url,
183 node_version,
184 reg_secret,
185 secp256k1_privhex,
186 client,
187 register_interval_secs,
188 max_retry,
189 capabilities,
190 } = cfg;
191 let sk = match load_secp256k1_privhex(&secp256k1_privhex) {
192 Ok(k) => k,
193 Err(e) => {
194 warn!("Invalid secp256k1 private key (redacted): {}", e);
195 return;
196 }
197 };
198
199 let register_interval = Duration::from_secs(register_interval_secs.max(1));
200 let lock_stale_after = {
201 let base = register_interval.saturating_mul(2);
202 let min = Duration::from_secs(30);
203 let max = Duration::from_secs(600);
204 if base < min {
205 min
206 } else if base > max {
207 max
208 } else {
209 base
210 }
211 };
212
213 fn timer_interval_secs(attempt: i32) -> u64 {
214 if attempt <= 0 {
215 return 0;
216 }
217 let p = 2_i64.saturating_pow(attempt as u32);
218 p.clamp(0, 60) as u64
219 }
220
221 let _ = set_status(read_state().map(|s| s.status).unwrap_or_default().as_str());
222
223 let mut attempt: i32 = 0;
224 let mut next_sleep = Duration::from_secs(1);
225
226 info!(
227 event = "registration.loop.start",
228 register_interval_sec = register_interval.as_secs() as i64,
229 node_version = %node_version,
230 "registration loop started"
231 );
232
233 loop {
234 tokio::time::sleep(next_sleep).await;
235 let RegistrationState { status, .. } = read_state().unwrap_or_default();
236
237 match status.as_str() {
238 STATUS_DISCONNECTED | STATUS_REGISTERING | STATUS_REGISTERED => {
239 let lock_guard = match LockGuard::try_acquire(lock_stale_after) {
240 Ok(Some(g)) => {
241 info!(event = "lock.acquired", "registration lock acquired");
242 Some(g)
243 }
244 Ok(None) => {
245 debug!(event = "lock.busy", "another registrar is active");
246 next_sleep = register_interval;
247 continue;
248 }
249 Err(e) => {
250 warn!(event = "lock.error", error = %e, "could not acquire lock");
251 next_sleep = register_interval;
252 continue;
253 }
254 };
255
256 if status.as_str() == STATUS_DISCONNECTED {
257 if let Err(e) = set_status(STATUS_REGISTERING) {
258 warn!(event = "status.transition.error", error = %e);
259 } else {
260 info!(
261 event = "status.transition",
262 from = STATUS_DISCONNECTED,
263 to = STATUS_REGISTERING,
264 "moved to registering"
265 );
266 }
267 } else if status.as_str() == STATUS_REGISTERED {
268 if let Err(e) = set_status(STATUS_REGISTERING) {
269 warn!(event = "status.transition.error", error = %e);
270 } else {
271 info!(
272 event = "status.transition",
273 from = STATUS_REGISTERED,
274 to = STATUS_REGISTERING,
275 "refreshing registration"
276 );
277 }
278 }
279
280 attempt += 1;
281
282 let start = Instant::now();
283 let res = register_once(
284 &dds_base_url,
285 &node_version,
286 ®_secret,
287 &sk,
288 &client,
289 &capabilities,
290 )
291 .await;
292 let elapsed_ms = start.elapsed().as_millis();
293
294 match res {
295 Ok(()) => {
296 let _ = set_status(STATUS_REGISTERED);
297 info!(
298 event = "registration.success",
299 elapsed_ms = elapsed_ms as i64,
300 "successfully registered to DDS"
301 );
302 attempt = 0;
303 next_sleep = register_interval;
304 drop(lock_guard);
305 }
306 Err(e) => {
307 warn!(
308 event = "registration.error",
309 elapsed_ms = elapsed_ms as i64,
310 error = %e,
311 error_debug = ?e,
312 attempt = attempt,
313 "registration to DDS failed; will back off"
314 );
315 if max_retry >= 0 && attempt >= max_retry {
316 warn!(
317 event = "registration.max_retry_reached",
318 max_retry = max_retry,
319 "max retry reached; pausing until next TTL window"
320 );
321 attempt = 0;
322 next_sleep = register_interval;
323 drop(lock_guard);
324 continue;
325 }
326 let base = Duration::from_secs(timer_interval_secs(attempt));
327 let jitter_factor: f64 = rand::thread_rng().gen_range(0.8..=1.2);
328 next_sleep =
329 Duration::from_secs_f64(base.as_secs_f64() * jitter_factor.max(0.1));
330 drop(lock_guard);
331 }
332 }
333 }
334 other => {
335 warn!(
336 event = "status.unknown",
337 status = other,
338 "unknown status; resetting to disconnected"
339 );
340 let _ = set_status(STATUS_DISCONNECTED);
341 next_sleep = Duration::from_secs(1);
342 }
343 }
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use crate::crypto::load_secp256k1_privhex;
351 use parking_lot::Mutex as PLMutex;
352 use std::io;
353 use std::sync::Arc;
354 use tracing::subscriber;
355 use tracing_subscriber::layer::SubscriberExt;
356
357 struct BufWriter(Arc<PLMutex<Vec<u8>>>);
358 impl io::Write for BufWriter {
359 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
360 self.0.lock().extend_from_slice(buf);
361 Ok(buf.len())
362 }
363 fn flush(&mut self) -> io::Result<()> {
364 Ok(())
365 }
366 }
367 struct MakeBufWriter(Arc<PLMutex<Vec<u8>>>);
368 impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for MakeBufWriter {
369 type Writer = BufWriter;
370 fn make_writer(&'a self) -> Self::Writer {
371 BufWriter(self.0.clone())
372 }
373 }
374
375 #[tokio::test]
376 async fn logs_do_not_include_secret() {
377 let buf = Arc::new(PLMutex::new(Vec::<u8>::new()));
378 let make = MakeBufWriter(buf.clone());
379 let layer = tracing_subscriber::fmt::layer()
380 .with_writer(make)
381 .with_ansi(false)
382 .without_time();
383 let subscriber = tracing_subscriber::registry().with(layer);
384 let _guard = subscriber::set_default(subscriber);
385
386 let secret = "my-super-secret";
387 let dds = "http://127.0.0.1:9";
388 let version = "1.2.3";
389 let sk = load_secp256k1_privhex(
390 "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace",
391 )
392 .unwrap();
393 let client = reqwest::Client::builder()
394 .no_proxy()
395 .timeout(Duration::from_millis(200))
396 .build()
397 .unwrap();
398 let capabilities = vec![
399 "/reconstruction/global-refinement/v1".to_string(),
400 "/reconstruction/local-refinement/v1".to_string(),
401 ];
402
403 let _ = register_once(dds, version, secret, &sk, &client, &capabilities).await;
404
405 let captured = String::from_utf8(buf.lock().clone()).unwrap_or_default();
406 assert!(captured.contains("Registering node with DDS"));
407 assert!(
408 !captured.contains(secret),
409 "logs leaked secret: {}",
410 captured
411 );
412 }
413}