Skip to main content

posemesh_node_registration/
register.rs

1use crate::crypto::{derive_eth_address, load_secp256k1_privhex, sign_eip191_recoverable_hex};
2use crate::state::{
3    read_state, set_status, LockGuard, RegistrationState, STATUS_DISCONNECTED, STATUS_REGISTERED,
4    STATUS_REGISTERING,
5};
6use anyhow::anyhow;
7use rand::Rng;
8use reqwest::{Client, StatusCode};
9use secp256k1::SecretKey;
10use serde::{Deserialize, Serialize};
11use std::time::{Duration, Instant};
12use tracing::{debug, info, warn};
13
14const PARKED_POLL_INTERVAL: Duration = Duration::from_secs(1);
15const RATE_LIMITED_WARN_INTERVAL: Duration = Duration::from_secs(180);
16
17#[derive(Debug, Serialize)]
18pub struct NodeRegisterWalletRequest {
19    pub message: String,
20    pub signature: String,
21    pub registration_credentials: String,
22    pub capabilities: Vec<String>,
23    pub version: String,
24}
25
26#[derive(Debug, Deserialize)]
27struct SiweRequestMeta {
28    pub nonce: Option<String>,
29    pub domain: Option<String>,
30    pub uri: Option<String>,
31    pub version: Option<String>,
32    #[serde(rename = "chainId")]
33    pub chain_id: Option<i64>,
34    #[serde(rename = "issuedAt")]
35    pub issued_at: Option<String>,
36}
37
38fn registration_endpoint(dds_base_url: &str) -> String {
39    let base = dds_base_url.trim_end_matches('/');
40    format!("{}/internal/v1/nodes/register-wallet", base)
41}
42
43fn siwe_request_endpoint(dds_base_url: &str) -> String {
44    let base = dds_base_url.trim_end_matches('/');
45    format!("{}/internal/v1/auth/siwe/request", base)
46}
47
48async fn request_siwe_meta(
49    dds_base_url: &str,
50    wallet: &str,
51    client: &Client,
52) -> std::result::Result<SiweRequestMeta, RegistrationAttempt> {
53    let endpoint = siwe_request_endpoint(dds_base_url);
54    let res = client
55        .post(&endpoint)
56        .json(&serde_json::json!({ "wallet": wallet }))
57        .send()
58        .await
59        .map_err(|err| {
60            RegistrationAttempt::retryable_failure(format!(
61                "request SIWE nonce failed: endpoint {}, error: {}",
62                endpoint, err
63            ))
64        })?;
65    let status = res.status();
66    if !status.is_success() {
67        let body_snippet = response_body_snippet(res).await;
68        return Err(classify_http_status(
69            status,
70            format!(
71                "request SIWE nonce failed: status {}, endpoint {}, body_snippet: {}",
72                status, endpoint, body_snippet
73            ),
74        ));
75    }
76    let body: SiweRequestMeta = res.json().await.map_err(|err| {
77        RegistrationAttempt::retryable_failure(format!(
78            "decode SIWE nonce response failed: endpoint {}, error: {}",
79            endpoint, err
80        ))
81    })?;
82    if body.nonce.as_deref().unwrap_or("").is_empty() {
83        return Err(RegistrationAttempt::retryable_failure(
84            "siwe nonce missing in response".to_string(),
85        ));
86    }
87    Ok(body)
88}
89
90fn compose_message(meta: &SiweRequestMeta, address: &str) -> anyhow::Result<String> {
91    let domain = meta
92        .domain
93        .as_deref()
94        .ok_or_else(|| anyhow!("siwe domain missing"))?;
95    let uri = meta
96        .uri
97        .as_deref()
98        .ok_or_else(|| anyhow!("siwe uri missing"))?;
99    let version = meta
100        .version
101        .as_deref()
102        .ok_or_else(|| anyhow!("siwe version missing"))?;
103    let chain_id = meta
104        .chain_id
105        .ok_or_else(|| anyhow!("siwe chain id missing"))?;
106    let nonce = meta
107        .nonce
108        .as_deref()
109        .ok_or_else(|| anyhow!("siwe nonce missing"))?;
110    let issued_at = meta
111        .issued_at
112        .as_deref()
113        .ok_or_else(|| anyhow!("siwe issued_at missing"))?;
114
115    let mut out = String::new();
116    out.push_str(&format!(
117        "{} wants you to sign in with your Ethereum account:\n",
118        domain
119    ));
120    out.push_str(address);
121    out.push_str("\n\n");
122    out.push_str(&format!("URI: {}\n", uri));
123    out.push_str(&format!("Version: {}\n", version));
124    out.push_str(&format!("Chain ID: {}\n", chain_id));
125    out.push_str(&format!("Nonce: {}\n", nonce));
126    out.push_str(&format!("Issued At: {}", issued_at));
127    Ok(out)
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131enum RegistrationAttemptKind {
132    Registered,
133    Conflict,
134    RetryableFailure,
135    SlowRetryFailure,
136}
137
138#[derive(Debug, Clone)]
139pub struct RegistrationAttempt {
140    kind: RegistrationAttemptKind,
141    error: Option<String>,
142}
143
144impl RegistrationAttempt {
145    fn registered() -> Self {
146        Self {
147            kind: RegistrationAttemptKind::Registered,
148            error: None,
149        }
150    }
151
152    fn conflict(error: String) -> Self {
153        Self {
154            kind: RegistrationAttemptKind::Conflict,
155            error: Some(error),
156        }
157    }
158
159    fn retryable_failure(error: String) -> Self {
160        Self {
161            kind: RegistrationAttemptKind::RetryableFailure,
162            error: Some(error),
163        }
164    }
165
166    fn slow_retry_failure(error: String) -> Self {
167        Self {
168            kind: RegistrationAttemptKind::SlowRetryFailure,
169            error: Some(error),
170        }
171    }
172
173    fn error_text(&self) -> &str {
174        self.error.as_deref().unwrap_or("")
175    }
176}
177
178fn classify_http_status(status: StatusCode, error: String) -> RegistrationAttempt {
179    match status {
180        StatusCode::CONFLICT => RegistrationAttempt::conflict(error),
181        StatusCode::REQUEST_TIMEOUT
182        | StatusCode::TOO_MANY_REQUESTS
183        | StatusCode::BAD_GATEWAY
184        | StatusCode::SERVICE_UNAVAILABLE
185        | StatusCode::GATEWAY_TIMEOUT => RegistrationAttempt::retryable_failure(error),
186        s if s.is_server_error() => RegistrationAttempt::retryable_failure(error),
187        _ => RegistrationAttempt::slow_retry_failure(error),
188    }
189}
190
191async fn response_body_snippet(res: reqwest::Response) -> String {
192    match res.text().await {
193        Ok(mut text) => {
194            if text.len() > 512 {
195                text.truncate(512);
196            }
197            text.replace('\n', " ")
198        }
199        Err(_) => "<unavailable>".to_string(),
200    }
201}
202
203pub async fn register_once(
204    dds_base_url: &str,
205    node_version: &str,
206    reg_secret: &str,
207    sk: &SecretKey,
208    client: &Client,
209    capabilities: &[String],
210) -> RegistrationAttempt {
211    if capabilities.is_empty() {
212        return RegistrationAttempt::slow_retry_failure(
213            "capabilities must be non-empty for DDS registration".to_string(),
214        );
215    }
216    let wallet = derive_eth_address(sk);
217    let wallet_prefix = wallet.get(0..10).unwrap_or(&wallet);
218    info!(
219        wallet_prefix = wallet_prefix,
220        version = node_version,
221        capabilities = ?capabilities,
222        "Registering node with DDS (SIWE)"
223    );
224
225    let meta = match request_siwe_meta(dds_base_url, &wallet, client).await {
226        Ok(meta) => meta,
227        Err(attempt) => return attempt,
228    };
229    let message = match compose_message(&meta, &wallet) {
230        Ok(message) => message,
231        Err(err) => {
232            return RegistrationAttempt::retryable_failure(format!(
233                "compose SIWE message failed: {}",
234                err
235            ));
236        }
237    };
238    let signature = sign_eip191_recoverable_hex(sk, &message);
239    let req = NodeRegisterWalletRequest {
240        message,
241        signature,
242        registration_credentials: reg_secret.to_owned(),
243        capabilities: capabilities.to_vec(),
244        version: node_version.to_owned(),
245    };
246    let endpoint = registration_endpoint(dds_base_url);
247
248    let res = client
249        .post(&endpoint)
250        .json(&req)
251        .send()
252        .await
253        .map_err(|err| {
254            RegistrationAttempt::retryable_failure(format!(
255                "registration request failed: endpoint {}, error: {}",
256                endpoint, err
257            ))
258        });
259
260    let res = match res {
261        Ok(res) => res,
262        Err(attempt) => return attempt,
263    };
264
265    if res.status().is_success() {
266        debug!(status = ?res.status(), "Registration ok");
267        RegistrationAttempt::registered()
268    } else {
269        let status = res.status();
270        let body_snippet = response_body_snippet(res).await;
271        classify_http_status(
272            status,
273            format!(
274                "registration failed: status {}, endpoint {}, body_snippet: {}",
275                status, endpoint, body_snippet
276            ),
277        )
278    }
279}
280
281#[derive(Debug)]
282pub struct RegistrationConfig {
283    pub dds_base_url: String,
284    pub node_version: String,
285    pub reg_secret: String,
286    pub secp256k1_privhex: String,
287    pub client: Client,
288    pub register_interval_secs: u64,
289    pub max_retry: i32,
290    pub capabilities: Vec<String>,
291}
292
293pub async fn run_registration_loop(cfg: RegistrationConfig) {
294    let RegistrationConfig {
295        dds_base_url,
296        node_version,
297        reg_secret,
298        secp256k1_privhex,
299        client,
300        register_interval_secs,
301        max_retry,
302        capabilities,
303    } = cfg;
304    let sk = match load_secp256k1_privhex(&secp256k1_privhex) {
305        Ok(k) => k,
306        Err(e) => {
307            warn!("Invalid secp256k1 private key (redacted): {}", e);
308            return;
309        }
310    };
311
312    let register_interval = Duration::from_secs(register_interval_secs.max(1));
313    let lock_stale_after = {
314        let base = register_interval.saturating_mul(2);
315        let min = Duration::from_secs(30);
316        let max = Duration::from_secs(600);
317        if base < min {
318            min
319        } else if base > max {
320            max
321        } else {
322            base
323        }
324    };
325
326    fn timer_interval_secs(attempt: i32) -> u64 {
327        if attempt <= 0 {
328            return 0;
329        }
330        let p = 2_i64.saturating_pow(attempt as u32);
331        p.clamp(0, 60) as u64
332    }
333
334    let _ = set_status(read_state().map(|s| s.status).unwrap_or_default().as_str());
335
336    let mut transient_attempt: i32 = 0;
337    let mut next_sleep = Duration::ZERO;
338    let mut conflict_episode_started_at: Option<Instant> = None;
339    let mut next_conflict_warn_at: Option<Instant> = None;
340    let mut last_slow_warn_at: Option<Instant> = None;
341
342    info!(
343        event = "registration.loop.start",
344        register_interval_sec = register_interval.as_secs() as i64,
345        node_version = %node_version,
346        "registration loop started"
347    );
348
349    loop {
350        tokio::time::sleep(next_sleep).await;
351        let RegistrationState { status, .. } = read_state().unwrap_or_default();
352
353        match status.as_str() {
354            STATUS_REGISTERED => {
355                next_sleep = PARKED_POLL_INTERVAL;
356                continue;
357            }
358            STATUS_DISCONNECTED | STATUS_REGISTERING => {
359                let lock_guard = match LockGuard::try_acquire(lock_stale_after) {
360                    Ok(Some(g)) => {
361                        info!(event = "lock.acquired", "registration lock acquired");
362                        Some(g)
363                    }
364                    Ok(None) => {
365                        debug!(event = "lock.busy", "another registrar is active");
366                        next_sleep = register_interval;
367                        continue;
368                    }
369                    Err(e) => {
370                        warn!(event = "lock.error", error = %e, "could not acquire lock");
371                        next_sleep = register_interval;
372                        continue;
373                    }
374                };
375
376                if status.as_str() == STATUS_DISCONNECTED {
377                    if let Err(e) = set_status(STATUS_REGISTERING) {
378                        warn!(event = "status.transition.error", error = %e);
379                    } else {
380                        info!(
381                            event = "status.transition",
382                            from = STATUS_DISCONNECTED,
383                            to = STATUS_REGISTERING,
384                            "moved to registering"
385                        );
386                    }
387                }
388                let start = Instant::now();
389                let attempt = register_once(
390                    &dds_base_url,
391                    &node_version,
392                    &reg_secret,
393                    &sk,
394                    &client,
395                    &capabilities,
396                )
397                .await;
398                let elapsed_ms = start.elapsed().as_millis();
399
400                match attempt.kind {
401                    RegistrationAttemptKind::Registered => {
402                        let _ = set_status(STATUS_REGISTERED);
403                        info!(
404                            event = "registration.success",
405                            elapsed_ms = elapsed_ms as i64,
406                            "successfully registered to DDS"
407                        );
408                        transient_attempt = 0;
409                        conflict_episode_started_at = None;
410                        next_conflict_warn_at = None;
411                        last_slow_warn_at = None;
412                        next_sleep = PARKED_POLL_INTERVAL;
413                        drop(lock_guard);
414                    }
415                    RegistrationAttemptKind::Conflict => {
416                        transient_attempt = 0;
417                        last_slow_warn_at = None;
418                        let now = Instant::now();
419                        let should_warn = match next_conflict_warn_at {
420                            Some(deadline) => now >= deadline,
421                            None => true,
422                        };
423                        let blocked_ms = if let Some(started_at) = conflict_episode_started_at {
424                            now.duration_since(started_at).as_millis() as i64
425                        } else {
426                            0
427                        };
428                        if conflict_episode_started_at.is_none() {
429                            conflict_episode_started_at = Some(now);
430                        }
431                        if should_warn {
432                            next_conflict_warn_at = Some(now + RATE_LIMITED_WARN_INTERVAL);
433                            warn!(
434                                event = "registration.conflict",
435                                elapsed_ms = elapsed_ms as i64,
436                                blocked_ms,
437                                error = attempt.error_text(),
438                                "registration blocked by an existing online node; will retry after cooldown"
439                            );
440                        } else {
441                            debug!(
442                                event = "registration.conflict",
443                                elapsed_ms = elapsed_ms as i64,
444                                blocked_ms,
445                                error = attempt.error_text(),
446                                "registration still blocked by an existing online node"
447                            );
448                        }
449                        next_sleep = register_interval;
450                        drop(lock_guard);
451                    }
452                    RegistrationAttemptKind::RetryableFailure => {
453                        conflict_episode_started_at = None;
454                        next_conflict_warn_at = None;
455                        transient_attempt += 1;
456                        warn!(
457                            event = "registration.error",
458                            elapsed_ms = elapsed_ms as i64,
459                            error = attempt.error_text(),
460                            attempt = transient_attempt,
461                            "registration to DDS failed; will back off"
462                        );
463                        if max_retry >= 0 && transient_attempt >= max_retry {
464                            warn!(
465                                event = "registration.max_retry_reached",
466                                max_retry = max_retry,
467                                "max retry reached; pausing until next TTL window"
468                            );
469                            transient_attempt = 0;
470                            next_sleep = register_interval;
471                            drop(lock_guard);
472                            continue;
473                        }
474                        let base = Duration::from_secs(timer_interval_secs(transient_attempt));
475                        let jitter_factor: f64 = rand::thread_rng().gen_range(0.8..=1.2);
476                        next_sleep =
477                            Duration::from_secs_f64(base.as_secs_f64() * jitter_factor.max(0.1));
478                        drop(lock_guard);
479                    }
480                    RegistrationAttemptKind::SlowRetryFailure => {
481                        transient_attempt = 0;
482                        conflict_episode_started_at = None;
483                        next_conflict_warn_at = None;
484                        let now = Instant::now();
485                        let should_warn = match last_slow_warn_at {
486                            Some(deadline) => {
487                                now.duration_since(deadline) >= RATE_LIMITED_WARN_INTERVAL
488                            }
489                            None => true,
490                        };
491                        if should_warn {
492                            last_slow_warn_at = Some(now);
493                            warn!(
494                                event = "registration.error",
495                                elapsed_ms = elapsed_ms as i64,
496                                error = attempt.error_text(),
497                                "registration to DDS failed; will retry after cooldown"
498                            );
499                        } else {
500                            debug!(
501                                event = "registration.error",
502                                elapsed_ms = elapsed_ms as i64,
503                                error = attempt.error_text(),
504                                "registration to DDS still blocked by a non-retryable error"
505                            );
506                        }
507                        next_sleep = register_interval;
508                        drop(lock_guard);
509                    }
510                }
511            }
512            other => {
513                warn!(
514                    event = "status.unknown",
515                    status = other,
516                    "unknown status; resetting to disconnected"
517                );
518                let _ = set_status(STATUS_DISCONNECTED);
519                next_sleep = Duration::from_secs(1);
520            }
521        }
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::crypto::load_secp256k1_privhex;
529    use crate::state::{clear_node_secret, write_state, RegistrationState};
530    use axum::{http::StatusCode, routing::post, Router};
531    use parking_lot::Mutex as PLMutex;
532    use std::io;
533    use std::sync::atomic::{AtomicUsize, Ordering};
534    use std::sync::Arc;
535    use std::sync::OnceLock;
536    use tokio::net::TcpListener;
537    use tracing::subscriber;
538    use tracing_subscriber::layer::SubscriberExt;
539
540    struct BufWriter(Arc<PLMutex<Vec<u8>>>);
541    impl io::Write for BufWriter {
542        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
543            self.0.lock().extend_from_slice(buf);
544            Ok(buf.len())
545        }
546        fn flush(&mut self) -> io::Result<()> {
547            Ok(())
548        }
549    }
550    struct MakeBufWriter(Arc<PLMutex<Vec<u8>>>);
551    impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for MakeBufWriter {
552        type Writer = BufWriter;
553        fn make_writer(&'a self) -> Self::Writer {
554            BufWriter(self.0.clone())
555        }
556    }
557
558    fn test_lock() -> &'static PLMutex<()> {
559        static TEST_LOCK: OnceLock<PLMutex<()>> = OnceLock::new();
560        TEST_LOCK.get_or_init(|| PLMutex::new(()))
561    }
562
563    fn reset_registration_state() {
564        clear_node_secret().unwrap();
565        write_state(&RegistrationState::default()).unwrap();
566    }
567
568    #[tokio::test(flavor = "current_thread")]
569    async fn logs_do_not_include_secret() {
570        let _guard = test_lock().lock();
571        reset_registration_state();
572
573        let buf = Arc::new(PLMutex::new(Vec::<u8>::new()));
574        let make = MakeBufWriter(buf.clone());
575        let layer = tracing_subscriber::fmt::layer()
576            .with_writer(make)
577            .with_ansi(false)
578            .without_time();
579        let subscriber = tracing_subscriber::registry().with(layer);
580        let _guard = subscriber::set_default(subscriber);
581
582        let secret = "my-super-secret";
583        let dds = "http://127.0.0.1:9";
584        let version = "1.2.3";
585        let sk = load_secp256k1_privhex(
586            "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace",
587        )
588        .unwrap();
589        let client = reqwest::Client::builder()
590            .no_proxy()
591            .timeout(Duration::from_millis(200))
592            .build()
593            .unwrap();
594        let capabilities = vec![
595            "/reconstruction/global-refinement/v1".to_string(),
596            "/reconstruction/local-refinement/v1".to_string(),
597        ];
598
599        let _ = register_once(dds, version, secret, &sk, &client, &capabilities).await;
600
601        let captured = String::from_utf8(buf.lock().clone()).unwrap_or_default();
602        assert!(captured.contains("Registering node with DDS"));
603        assert!(
604            !captured.contains(secret),
605            "logs leaked secret: {}",
606            captured
607        );
608    }
609
610    #[tokio::test(flavor = "current_thread")]
611    async fn register_once_classifies_conflict() {
612        let _guard = test_lock().lock();
613        reset_registration_state();
614
615        async fn conflict_handler() -> StatusCode {
616            StatusCode::CONFLICT
617        }
618
619        let app = Router::new()
620            .route("/internal/v1/auth/siwe/request", post(conflict_handler))
621            .route("/internal/v1/nodes/register-wallet", post(conflict_handler));
622        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
623        let addr = listener.local_addr().unwrap();
624        let server = tokio::spawn(async move {
625            axum::serve(listener, app).await.unwrap();
626        });
627
628        let client = reqwest::Client::builder().no_proxy().build().unwrap();
629        let sk = load_secp256k1_privhex(
630            "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace",
631        )
632        .unwrap();
633        let attempt = register_once(
634            &format!("http://{}", addr),
635            "1.2.3",
636            "secret",
637            &sk,
638            &client,
639            &["/cap/example/v1".to_string()],
640        )
641        .await;
642
643        assert_eq!(attempt.kind, RegistrationAttemptKind::Conflict);
644        server.abort();
645    }
646
647    #[tokio::test(flavor = "current_thread")]
648    async fn registration_loop_parks_after_success() {
649        let _guard = test_lock().lock();
650        reset_registration_state();
651
652        let request_hits = Arc::new(AtomicUsize::new(0));
653        let register_hits = Arc::new(AtomicUsize::new(0));
654
655        async fn nonce_handler() -> axum::Json<serde_json::Value> {
656            axum::Json(serde_json::json!({
657                "nonce": "abc12345",
658                "domain": "dds.example.com",
659                "uri": "https://dds.example.com",
660                "version": "1",
661                "chainId": 8453,
662                "issuedAt": "2026-01-01T00:00:00Z"
663            }))
664        }
665
666        let request_hits_clone = Arc::clone(&request_hits);
667        let register_hits_clone = Arc::clone(&register_hits);
668        let app = Router::new()
669            .route(
670                "/internal/v1/auth/siwe/request",
671                post(move || {
672                    let request_hits = Arc::clone(&request_hits_clone);
673                    async move {
674                        request_hits.fetch_add(1, Ordering::SeqCst);
675                        nonce_handler().await
676                    }
677                }),
678            )
679            .route(
680                "/internal/v1/nodes/register-wallet",
681                post(move || {
682                    let register_hits = Arc::clone(&register_hits_clone);
683                    async move {
684                        register_hits.fetch_add(1, Ordering::SeqCst);
685                        StatusCode::OK
686                    }
687                }),
688            );
689        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
690        let addr = listener.local_addr().unwrap();
691        let server = tokio::spawn(async move {
692            axum::serve(listener, app).await.unwrap();
693        });
694
695        let client = reqwest::Client::builder().no_proxy().build().unwrap();
696        let cfg = RegistrationConfig {
697            dds_base_url: format!("http://{}", addr),
698            node_version: "1.2.3".to_string(),
699            reg_secret: "secret".to_string(),
700            secp256k1_privhex: "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace"
701                .to_string(),
702            client,
703            register_interval_secs: 1,
704            max_retry: -1,
705            capabilities: vec!["/cap/example/v1".to_string()],
706        };
707
708        let handle = tokio::spawn(async move {
709            run_registration_loop(cfg).await;
710        });
711
712        tokio::time::sleep(Duration::from_millis(300)).await;
713        assert_eq!(read_state().unwrap().status, STATUS_REGISTERED);
714
715        tokio::time::sleep(Duration::from_millis(1300)).await;
716        assert_eq!(request_hits.load(Ordering::SeqCst), 1);
717        assert_eq!(register_hits.load(Ordering::SeqCst), 1);
718
719        handle.abort();
720        server.abort();
721    }
722}