1use crate::crypto::{derive_eth_address, load_secp256k1_privhex, sign_eip191_recoverable_hex};
2use crate::state::{
3 read_state, set_status, LockGuard, RegistrationState, STATUS_DISCONNECTED, STATUS_REGISTERED,
4 STATUS_REGISTERING,
5};
6use anyhow::anyhow;
7use rand::Rng;
8use reqwest::{Client, StatusCode};
9use secp256k1::SecretKey;
10use serde::{Deserialize, Serialize};
11use std::time::{Duration, Instant};
12use tracing::{debug, info, warn};
13
14const PARKED_POLL_INTERVAL: Duration = Duration::from_secs(1);
15const RATE_LIMITED_WARN_INTERVAL: Duration = Duration::from_secs(180);
16
17#[derive(Debug, Serialize)]
18pub struct NodeRegisterWalletRequest {
19 pub message: String,
20 pub signature: String,
21 pub registration_credentials: String,
22 pub capabilities: Vec<String>,
23 pub version: String,
24}
25
26#[derive(Debug, Deserialize)]
27struct SiweRequestMeta {
28 pub nonce: Option<String>,
29 pub domain: Option<String>,
30 pub uri: Option<String>,
31 pub version: Option<String>,
32 #[serde(rename = "chainId")]
33 pub chain_id: Option<i64>,
34 #[serde(rename = "issuedAt")]
35 pub issued_at: Option<String>,
36}
37
38fn registration_endpoint(dds_base_url: &str) -> String {
39 let base = dds_base_url.trim_end_matches('/');
40 format!("{}/internal/v1/nodes/register-wallet", base)
41}
42
43fn siwe_request_endpoint(dds_base_url: &str) -> String {
44 let base = dds_base_url.trim_end_matches('/');
45 format!("{}/internal/v1/auth/siwe/request", base)
46}
47
48async fn request_siwe_meta(
49 dds_base_url: &str,
50 wallet: &str,
51 client: &Client,
52) -> std::result::Result<SiweRequestMeta, RegistrationAttempt> {
53 let endpoint = siwe_request_endpoint(dds_base_url);
54 let res = client
55 .post(&endpoint)
56 .json(&serde_json::json!({ "wallet": wallet }))
57 .send()
58 .await
59 .map_err(|err| {
60 RegistrationAttempt::retryable_failure(format!(
61 "request SIWE nonce failed: endpoint {}, error: {}",
62 endpoint, err
63 ))
64 })?;
65 let status = res.status();
66 if !status.is_success() {
67 let body_snippet = response_body_snippet(res).await;
68 return Err(classify_http_status(
69 status,
70 format!(
71 "request SIWE nonce failed: status {}, endpoint {}, body_snippet: {}",
72 status, endpoint, body_snippet
73 ),
74 ));
75 }
76 let body: SiweRequestMeta = res.json().await.map_err(|err| {
77 RegistrationAttempt::retryable_failure(format!(
78 "decode SIWE nonce response failed: endpoint {}, error: {}",
79 endpoint, err
80 ))
81 })?;
82 if body.nonce.as_deref().unwrap_or("").is_empty() {
83 return Err(RegistrationAttempt::retryable_failure(
84 "siwe nonce missing in response".to_string(),
85 ));
86 }
87 Ok(body)
88}
89
90fn compose_message(meta: &SiweRequestMeta, address: &str) -> anyhow::Result<String> {
91 let domain = meta
92 .domain
93 .as_deref()
94 .ok_or_else(|| anyhow!("siwe domain missing"))?;
95 let uri = meta
96 .uri
97 .as_deref()
98 .ok_or_else(|| anyhow!("siwe uri missing"))?;
99 let version = meta
100 .version
101 .as_deref()
102 .ok_or_else(|| anyhow!("siwe version missing"))?;
103 let chain_id = meta
104 .chain_id
105 .ok_or_else(|| anyhow!("siwe chain id missing"))?;
106 let nonce = meta
107 .nonce
108 .as_deref()
109 .ok_or_else(|| anyhow!("siwe nonce missing"))?;
110 let issued_at = meta
111 .issued_at
112 .as_deref()
113 .ok_or_else(|| anyhow!("siwe issued_at missing"))?;
114
115 let mut out = String::new();
116 out.push_str(&format!(
117 "{} wants you to sign in with your Ethereum account:\n",
118 domain
119 ));
120 out.push_str(address);
121 out.push_str("\n\n");
122 out.push_str(&format!("URI: {}\n", uri));
123 out.push_str(&format!("Version: {}\n", version));
124 out.push_str(&format!("Chain ID: {}\n", chain_id));
125 out.push_str(&format!("Nonce: {}\n", nonce));
126 out.push_str(&format!("Issued At: {}", issued_at));
127 Ok(out)
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131enum RegistrationAttemptKind {
132 Registered,
133 Conflict,
134 RetryableFailure,
135 SlowRetryFailure,
136}
137
138#[derive(Debug, Clone)]
139pub struct RegistrationAttempt {
140 kind: RegistrationAttemptKind,
141 error: Option<String>,
142}
143
144impl RegistrationAttempt {
145 fn registered() -> Self {
146 Self {
147 kind: RegistrationAttemptKind::Registered,
148 error: None,
149 }
150 }
151
152 fn conflict(error: String) -> Self {
153 Self {
154 kind: RegistrationAttemptKind::Conflict,
155 error: Some(error),
156 }
157 }
158
159 fn retryable_failure(error: String) -> Self {
160 Self {
161 kind: RegistrationAttemptKind::RetryableFailure,
162 error: Some(error),
163 }
164 }
165
166 fn slow_retry_failure(error: String) -> Self {
167 Self {
168 kind: RegistrationAttemptKind::SlowRetryFailure,
169 error: Some(error),
170 }
171 }
172
173 fn error_text(&self) -> &str {
174 self.error.as_deref().unwrap_or("")
175 }
176}
177
178fn classify_http_status(status: StatusCode, error: String) -> RegistrationAttempt {
179 match status {
180 StatusCode::CONFLICT => RegistrationAttempt::conflict(error),
181 StatusCode::REQUEST_TIMEOUT
182 | StatusCode::TOO_MANY_REQUESTS
183 | StatusCode::BAD_GATEWAY
184 | StatusCode::SERVICE_UNAVAILABLE
185 | StatusCode::GATEWAY_TIMEOUT => RegistrationAttempt::retryable_failure(error),
186 s if s.is_server_error() => RegistrationAttempt::retryable_failure(error),
187 _ => RegistrationAttempt::slow_retry_failure(error),
188 }
189}
190
191async fn response_body_snippet(res: reqwest::Response) -> String {
192 match res.text().await {
193 Ok(mut text) => {
194 if text.len() > 512 {
195 text.truncate(512);
196 }
197 text.replace('\n', " ")
198 }
199 Err(_) => "<unavailable>".to_string(),
200 }
201}
202
203pub async fn register_once(
204 dds_base_url: &str,
205 node_version: &str,
206 reg_secret: &str,
207 sk: &SecretKey,
208 client: &Client,
209 capabilities: &[String],
210) -> RegistrationAttempt {
211 if capabilities.is_empty() {
212 return RegistrationAttempt::slow_retry_failure(
213 "capabilities must be non-empty for DDS registration".to_string(),
214 );
215 }
216 let wallet = derive_eth_address(sk);
217 let wallet_prefix = wallet.get(0..10).unwrap_or(&wallet);
218 info!(
219 wallet_prefix = wallet_prefix,
220 version = node_version,
221 capabilities = ?capabilities,
222 "Registering node with DDS (SIWE)"
223 );
224
225 let meta = match request_siwe_meta(dds_base_url, &wallet, client).await {
226 Ok(meta) => meta,
227 Err(attempt) => return attempt,
228 };
229 let message = match compose_message(&meta, &wallet) {
230 Ok(message) => message,
231 Err(err) => {
232 return RegistrationAttempt::retryable_failure(format!(
233 "compose SIWE message failed: {}",
234 err
235 ));
236 }
237 };
238 let signature = sign_eip191_recoverable_hex(sk, &message);
239 let req = NodeRegisterWalletRequest {
240 message,
241 signature,
242 registration_credentials: reg_secret.to_owned(),
243 capabilities: capabilities.to_vec(),
244 version: node_version.to_owned(),
245 };
246 let endpoint = registration_endpoint(dds_base_url);
247
248 let res = client
249 .post(&endpoint)
250 .json(&req)
251 .send()
252 .await
253 .map_err(|err| {
254 RegistrationAttempt::retryable_failure(format!(
255 "registration request failed: endpoint {}, error: {}",
256 endpoint, err
257 ))
258 });
259
260 let res = match res {
261 Ok(res) => res,
262 Err(attempt) => return attempt,
263 };
264
265 if res.status().is_success() {
266 debug!(status = ?res.status(), "Registration ok");
267 RegistrationAttempt::registered()
268 } else {
269 let status = res.status();
270 let body_snippet = response_body_snippet(res).await;
271 classify_http_status(
272 status,
273 format!(
274 "registration failed: status {}, endpoint {}, body_snippet: {}",
275 status, endpoint, body_snippet
276 ),
277 )
278 }
279}
280
281#[derive(Debug)]
282pub struct RegistrationConfig {
283 pub dds_base_url: String,
284 pub node_version: String,
285 pub reg_secret: String,
286 pub secp256k1_privhex: String,
287 pub client: Client,
288 pub register_interval_secs: u64,
289 pub max_retry: i32,
290 pub capabilities: Vec<String>,
291}
292
293pub async fn run_registration_loop(cfg: RegistrationConfig) {
294 let RegistrationConfig {
295 dds_base_url,
296 node_version,
297 reg_secret,
298 secp256k1_privhex,
299 client,
300 register_interval_secs,
301 max_retry,
302 capabilities,
303 } = cfg;
304 let sk = match load_secp256k1_privhex(&secp256k1_privhex) {
305 Ok(k) => k,
306 Err(e) => {
307 warn!("Invalid secp256k1 private key (redacted): {}", e);
308 return;
309 }
310 };
311
312 let register_interval = Duration::from_secs(register_interval_secs.max(1));
313 let lock_stale_after = {
314 let base = register_interval.saturating_mul(2);
315 let min = Duration::from_secs(30);
316 let max = Duration::from_secs(600);
317 if base < min {
318 min
319 } else if base > max {
320 max
321 } else {
322 base
323 }
324 };
325
326 fn timer_interval_secs(attempt: i32) -> u64 {
327 if attempt <= 0 {
328 return 0;
329 }
330 let p = 2_i64.saturating_pow(attempt as u32);
331 p.clamp(0, 60) as u64
332 }
333
334 let _ = set_status(read_state().map(|s| s.status).unwrap_or_default().as_str());
335
336 let mut transient_attempt: i32 = 0;
337 let mut next_sleep = Duration::ZERO;
338 let mut conflict_episode_started_at: Option<Instant> = None;
339 let mut next_conflict_warn_at: Option<Instant> = None;
340 let mut last_slow_warn_at: Option<Instant> = None;
341
342 info!(
343 event = "registration.loop.start",
344 register_interval_sec = register_interval.as_secs() as i64,
345 node_version = %node_version,
346 "registration loop started"
347 );
348
349 loop {
350 tokio::time::sleep(next_sleep).await;
351 let RegistrationState { status, .. } = read_state().unwrap_or_default();
352
353 match status.as_str() {
354 STATUS_REGISTERED => {
355 next_sleep = PARKED_POLL_INTERVAL;
356 continue;
357 }
358 STATUS_DISCONNECTED | STATUS_REGISTERING => {
359 let lock_guard = match LockGuard::try_acquire(lock_stale_after) {
360 Ok(Some(g)) => {
361 info!(event = "lock.acquired", "registration lock acquired");
362 Some(g)
363 }
364 Ok(None) => {
365 debug!(event = "lock.busy", "another registrar is active");
366 next_sleep = register_interval;
367 continue;
368 }
369 Err(e) => {
370 warn!(event = "lock.error", error = %e, "could not acquire lock");
371 next_sleep = register_interval;
372 continue;
373 }
374 };
375
376 if status.as_str() == STATUS_DISCONNECTED {
377 if let Err(e) = set_status(STATUS_REGISTERING) {
378 warn!(event = "status.transition.error", error = %e);
379 } else {
380 info!(
381 event = "status.transition",
382 from = STATUS_DISCONNECTED,
383 to = STATUS_REGISTERING,
384 "moved to registering"
385 );
386 }
387 }
388 let start = Instant::now();
389 let attempt = register_once(
390 &dds_base_url,
391 &node_version,
392 ®_secret,
393 &sk,
394 &client,
395 &capabilities,
396 )
397 .await;
398 let elapsed_ms = start.elapsed().as_millis();
399
400 match attempt.kind {
401 RegistrationAttemptKind::Registered => {
402 let _ = set_status(STATUS_REGISTERED);
403 info!(
404 event = "registration.success",
405 elapsed_ms = elapsed_ms as i64,
406 "successfully registered to DDS"
407 );
408 transient_attempt = 0;
409 conflict_episode_started_at = None;
410 next_conflict_warn_at = None;
411 last_slow_warn_at = None;
412 next_sleep = PARKED_POLL_INTERVAL;
413 drop(lock_guard);
414 }
415 RegistrationAttemptKind::Conflict => {
416 transient_attempt = 0;
417 last_slow_warn_at = None;
418 let now = Instant::now();
419 let should_warn = match next_conflict_warn_at {
420 Some(deadline) => now >= deadline,
421 None => true,
422 };
423 let blocked_ms = if let Some(started_at) = conflict_episode_started_at {
424 now.duration_since(started_at).as_millis() as i64
425 } else {
426 0
427 };
428 if conflict_episode_started_at.is_none() {
429 conflict_episode_started_at = Some(now);
430 }
431 if should_warn {
432 next_conflict_warn_at = Some(now + RATE_LIMITED_WARN_INTERVAL);
433 warn!(
434 event = "registration.conflict",
435 elapsed_ms = elapsed_ms as i64,
436 blocked_ms,
437 error = attempt.error_text(),
438 "registration blocked by an existing online node; will retry after cooldown"
439 );
440 } else {
441 debug!(
442 event = "registration.conflict",
443 elapsed_ms = elapsed_ms as i64,
444 blocked_ms,
445 error = attempt.error_text(),
446 "registration still blocked by an existing online node"
447 );
448 }
449 next_sleep = register_interval;
450 drop(lock_guard);
451 }
452 RegistrationAttemptKind::RetryableFailure => {
453 conflict_episode_started_at = None;
454 next_conflict_warn_at = None;
455 transient_attempt += 1;
456 warn!(
457 event = "registration.error",
458 elapsed_ms = elapsed_ms as i64,
459 error = attempt.error_text(),
460 attempt = transient_attempt,
461 "registration to DDS failed; will back off"
462 );
463 if max_retry >= 0 && transient_attempt >= max_retry {
464 warn!(
465 event = "registration.max_retry_reached",
466 max_retry = max_retry,
467 "max retry reached; pausing until next TTL window"
468 );
469 transient_attempt = 0;
470 next_sleep = register_interval;
471 drop(lock_guard);
472 continue;
473 }
474 let base = Duration::from_secs(timer_interval_secs(transient_attempt));
475 let jitter_factor: f64 = rand::thread_rng().gen_range(0.8..=1.2);
476 next_sleep =
477 Duration::from_secs_f64(base.as_secs_f64() * jitter_factor.max(0.1));
478 drop(lock_guard);
479 }
480 RegistrationAttemptKind::SlowRetryFailure => {
481 transient_attempt = 0;
482 conflict_episode_started_at = None;
483 next_conflict_warn_at = None;
484 let now = Instant::now();
485 let should_warn = match last_slow_warn_at {
486 Some(deadline) => {
487 now.duration_since(deadline) >= RATE_LIMITED_WARN_INTERVAL
488 }
489 None => true,
490 };
491 if should_warn {
492 last_slow_warn_at = Some(now);
493 warn!(
494 event = "registration.error",
495 elapsed_ms = elapsed_ms as i64,
496 error = attempt.error_text(),
497 "registration to DDS failed; will retry after cooldown"
498 );
499 } else {
500 debug!(
501 event = "registration.error",
502 elapsed_ms = elapsed_ms as i64,
503 error = attempt.error_text(),
504 "registration to DDS still blocked by a non-retryable error"
505 );
506 }
507 next_sleep = register_interval;
508 drop(lock_guard);
509 }
510 }
511 }
512 other => {
513 warn!(
514 event = "status.unknown",
515 status = other,
516 "unknown status; resetting to disconnected"
517 );
518 let _ = set_status(STATUS_DISCONNECTED);
519 next_sleep = Duration::from_secs(1);
520 }
521 }
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use crate::crypto::load_secp256k1_privhex;
529 use crate::state::{clear_node_secret, write_state, RegistrationState};
530 use axum::{http::StatusCode, routing::post, Router};
531 use parking_lot::Mutex as PLMutex;
532 use std::io;
533 use std::sync::atomic::{AtomicUsize, Ordering};
534 use std::sync::Arc;
535 use std::sync::OnceLock;
536 use tokio::net::TcpListener;
537 use tracing::subscriber;
538 use tracing_subscriber::layer::SubscriberExt;
539
540 struct BufWriter(Arc<PLMutex<Vec<u8>>>);
541 impl io::Write for BufWriter {
542 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
543 self.0.lock().extend_from_slice(buf);
544 Ok(buf.len())
545 }
546 fn flush(&mut self) -> io::Result<()> {
547 Ok(())
548 }
549 }
550 struct MakeBufWriter(Arc<PLMutex<Vec<u8>>>);
551 impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for MakeBufWriter {
552 type Writer = BufWriter;
553 fn make_writer(&'a self) -> Self::Writer {
554 BufWriter(self.0.clone())
555 }
556 }
557
558 fn test_lock() -> &'static PLMutex<()> {
559 static TEST_LOCK: OnceLock<PLMutex<()>> = OnceLock::new();
560 TEST_LOCK.get_or_init(|| PLMutex::new(()))
561 }
562
563 fn reset_registration_state() {
564 clear_node_secret().unwrap();
565 write_state(&RegistrationState::default()).unwrap();
566 }
567
568 #[tokio::test(flavor = "current_thread")]
569 async fn logs_do_not_include_secret() {
570 let _guard = test_lock().lock();
571 reset_registration_state();
572
573 let buf = Arc::new(PLMutex::new(Vec::<u8>::new()));
574 let make = MakeBufWriter(buf.clone());
575 let layer = tracing_subscriber::fmt::layer()
576 .with_writer(make)
577 .with_ansi(false)
578 .without_time();
579 let subscriber = tracing_subscriber::registry().with(layer);
580 let _guard = subscriber::set_default(subscriber);
581
582 let secret = "my-super-secret";
583 let dds = "http://127.0.0.1:9";
584 let version = "1.2.3";
585 let sk = load_secp256k1_privhex(
586 "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace",
587 )
588 .unwrap();
589 let client = reqwest::Client::builder()
590 .no_proxy()
591 .timeout(Duration::from_millis(200))
592 .build()
593 .unwrap();
594 let capabilities = vec![
595 "/reconstruction/global-refinement/v1".to_string(),
596 "/reconstruction/local-refinement/v1".to_string(),
597 ];
598
599 let _ = register_once(dds, version, secret, &sk, &client, &capabilities).await;
600
601 let captured = String::from_utf8(buf.lock().clone()).unwrap_or_default();
602 assert!(captured.contains("Registering node with DDS"));
603 assert!(
604 !captured.contains(secret),
605 "logs leaked secret: {}",
606 captured
607 );
608 }
609
610 #[tokio::test(flavor = "current_thread")]
611 async fn register_once_classifies_conflict() {
612 let _guard = test_lock().lock();
613 reset_registration_state();
614
615 async fn conflict_handler() -> StatusCode {
616 StatusCode::CONFLICT
617 }
618
619 let app = Router::new()
620 .route("/internal/v1/auth/siwe/request", post(conflict_handler))
621 .route("/internal/v1/nodes/register-wallet", post(conflict_handler));
622 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
623 let addr = listener.local_addr().unwrap();
624 let server = tokio::spawn(async move {
625 axum::serve(listener, app).await.unwrap();
626 });
627
628 let client = reqwest::Client::builder().no_proxy().build().unwrap();
629 let sk = load_secp256k1_privhex(
630 "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace",
631 )
632 .unwrap();
633 let attempt = register_once(
634 &format!("http://{}", addr),
635 "1.2.3",
636 "secret",
637 &sk,
638 &client,
639 &["/cap/example/v1".to_string()],
640 )
641 .await;
642
643 assert_eq!(attempt.kind, RegistrationAttemptKind::Conflict);
644 server.abort();
645 }
646
647 #[tokio::test(flavor = "current_thread")]
648 async fn registration_loop_parks_after_success() {
649 let _guard = test_lock().lock();
650 reset_registration_state();
651
652 let request_hits = Arc::new(AtomicUsize::new(0));
653 let register_hits = Arc::new(AtomicUsize::new(0));
654
655 async fn nonce_handler() -> axum::Json<serde_json::Value> {
656 axum::Json(serde_json::json!({
657 "nonce": "abc12345",
658 "domain": "dds.example.com",
659 "uri": "https://dds.example.com",
660 "version": "1",
661 "chainId": 8453,
662 "issuedAt": "2026-01-01T00:00:00Z"
663 }))
664 }
665
666 let request_hits_clone = Arc::clone(&request_hits);
667 let register_hits_clone = Arc::clone(®ister_hits);
668 let app = Router::new()
669 .route(
670 "/internal/v1/auth/siwe/request",
671 post(move || {
672 let request_hits = Arc::clone(&request_hits_clone);
673 async move {
674 request_hits.fetch_add(1, Ordering::SeqCst);
675 nonce_handler().await
676 }
677 }),
678 )
679 .route(
680 "/internal/v1/nodes/register-wallet",
681 post(move || {
682 let register_hits = Arc::clone(®ister_hits_clone);
683 async move {
684 register_hits.fetch_add(1, Ordering::SeqCst);
685 StatusCode::OK
686 }
687 }),
688 );
689 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
690 let addr = listener.local_addr().unwrap();
691 let server = tokio::spawn(async move {
692 axum::serve(listener, app).await.unwrap();
693 });
694
695 let client = reqwest::Client::builder().no_proxy().build().unwrap();
696 let cfg = RegistrationConfig {
697 dds_base_url: format!("http://{}", addr),
698 node_version: "1.2.3".to_string(),
699 reg_secret: "secret".to_string(),
700 secp256k1_privhex: "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace"
701 .to_string(),
702 client,
703 register_interval_secs: 1,
704 max_retry: -1,
705 capabilities: vec!["/cap/example/v1".to_string()],
706 };
707
708 let handle = tokio::spawn(async move {
709 run_registration_loop(cfg).await;
710 });
711
712 tokio::time::sleep(Duration::from_millis(300)).await;
713 assert_eq!(read_state().unwrap().status, STATUS_REGISTERED);
714
715 tokio::time::sleep(Duration::from_millis(1300)).await;
716 assert_eq!(request_hits.load(Ordering::SeqCst), 1);
717 assert_eq!(register_hits.load(Ordering::SeqCst), 1);
718
719 handle.abort();
720 server.abort();
721 }
722}