posemesh_node_registration/
register.rs1use crate::crypto::{
2 format_timestamp_nanos, load_secp256k1_privhex, secp256k1_pubkey_uncompressed_hex,
3 sign_recoverable_keccak_hex,
4};
5use crate::state::{
6 read_state, set_status, touch_healthcheck_now, LockGuard, RegistrationState,
7 STATUS_DISCONNECTED, STATUS_REGISTERED, STATUS_REGISTERING,
8};
9use anyhow::{anyhow, Context, Result};
10use chrono::Utc;
11use rand::Rng;
12use reqwest::Client;
13use secp256k1::SecretKey;
14use serde::Serialize;
15use std::time::{Duration, Instant};
16use tracing::{debug, info, warn};
17
18#[derive(Debug, Serialize)]
19pub struct NodeRegistrationRequest {
20 pub url: String,
21 pub version: String,
22 pub registration_credentials: String,
23 pub signature: String,
24 pub timestamp: String,
25 pub public_key: String,
26 pub capabilities: Vec<String>,
27}
28
29fn registration_endpoint(dds_base_url: &str) -> String {
30 let base = dds_base_url.trim_end_matches('/');
31 format!("{}/internal/v1/nodes/register", base)
32}
33
34fn build_registration_request(
35 node_url: &str,
36 node_version: &str,
37 reg_secret: &str,
38 sk: &SecretKey,
39 capabilities: &[String],
40) -> NodeRegistrationRequest {
41 let ts = format_timestamp_nanos(Utc::now());
42 let msg = format!("{}{}", node_url, ts);
43 let signature = sign_recoverable_keccak_hex(sk, msg.as_bytes());
44 let public_key = secp256k1_pubkey_uncompressed_hex(sk);
45 let registration_credentials = reg_secret.to_owned();
46 NodeRegistrationRequest {
47 url: node_url.to_owned(),
48 version: node_version.to_owned(),
49 registration_credentials,
50 signature,
51 timestamp: ts,
52 public_key,
53 capabilities: capabilities.to_vec(),
54 }
55}
56
57pub async fn register_once(
58 dds_base_url: &str,
59 node_url: &str,
60 node_version: &str,
61 reg_secret: &str,
62 sk: &SecretKey,
63 client: &Client,
64 capabilities: &[String],
65) -> Result<()> {
66 let req = build_registration_request(node_url, node_version, reg_secret, sk, capabilities);
67 let endpoint = registration_endpoint(dds_base_url);
68
69 let pk_short = req.public_key.get(0..16).unwrap_or(&req.public_key);
70 info!(
71 url = req.url,
72 version = req.version,
73 public_key_prefix = pk_short,
74 capabilities = ?req.capabilities,
75 "Registering node with DDS"
76 );
77
78 let res = client
79 .post(&endpoint)
80 .json(&req)
81 .send()
82 .await
83 .with_context(|| format!("POST {} failed", endpoint))?;
84
85 if res.status().is_success() {
86 debug!(status = ?res.status(), "Registration ok");
87 Ok(())
88 } else {
89 let status = res.status();
90 let body_snippet = match res.text().await {
91 Ok(mut text) => {
92 if text.len() > 512 {
93 text.truncate(512);
94 }
95 text.replace('\n', " ")
96 }
97 Err(_) => "<unavailable>".to_string(),
98 };
99 Err(anyhow!(
100 "registration failed: status {}, endpoint {}, body_snippet: {}",
101 status,
102 endpoint,
103 body_snippet
104 ))
105 }
106}
107
108#[derive(Debug)]
109pub struct RegistrationConfig {
110 pub dds_base_url: String,
111 pub node_url: String,
112 pub node_version: String,
113 pub reg_secret: String,
114 pub secp256k1_privhex: String,
115 pub client: Client,
116 pub register_interval_secs: u64,
117 pub max_retry: i32,
118 pub capabilities: Vec<String>,
119}
120
121pub async fn run_registration_loop(cfg: RegistrationConfig) {
122 let RegistrationConfig {
123 dds_base_url,
124 node_url,
125 node_version,
126 reg_secret,
127 secp256k1_privhex,
128 client,
129 register_interval_secs,
130 max_retry,
131 capabilities,
132 } = cfg;
133 let sk = match load_secp256k1_privhex(&secp256k1_privhex) {
134 Ok(k) => k,
135 Err(e) => {
136 warn!("Invalid secp256k1 private key (redacted): {}", e);
137 return;
138 }
139 };
140
141 let healthcheck_ttl = Duration::from_secs(register_interval_secs.max(1));
142 let lock_stale_after = {
143 let base = healthcheck_ttl.saturating_mul(2);
144 let min = Duration::from_secs(30);
145 let max = Duration::from_secs(600);
146 if base < min {
147 min
148 } else if base > max {
149 max
150 } else {
151 base
152 }
153 };
154
155 fn timer_interval_secs(attempt: i32) -> u64 {
156 if attempt <= 0 {
157 return 0;
158 }
159 let p = 2_i64.saturating_pow(attempt as u32);
160 p.clamp(0, 60) as u64
161 }
162
163 let _ = set_status(read_state().map(|s| s.status).unwrap_or_default().as_str());
164
165 let mut attempt: i32 = 0;
166 let mut next_sleep = Duration::from_secs(1);
167
168 info!(
169 event = "registration.loop.start",
170 healthcheck_ttl_sec = healthcheck_ttl.as_secs() as i64,
171 node_url = %node_url,
172 node_version = %node_version,
173 "registration loop started"
174 );
175
176 loop {
177 tokio::time::sleep(next_sleep).await;
178 let RegistrationState {
179 status,
180 last_healthcheck,
181 } = read_state().unwrap_or_default();
182
183 match status.as_str() {
184 STATUS_DISCONNECTED | STATUS_REGISTERING => {
185 let lock_guard = match LockGuard::try_acquire(lock_stale_after) {
186 Ok(Some(g)) => {
187 info!(event = "lock.acquired", "registration lock acquired");
188 Some(g)
189 }
190 Ok(None) => {
191 debug!(event = "lock.busy", "another registrar is active");
192 next_sleep = healthcheck_ttl;
193 continue;
194 }
195 Err(e) => {
196 warn!(event = "lock.error", error = %e, "could not acquire lock");
197 next_sleep = healthcheck_ttl;
198 continue;
199 }
200 };
201
202 if status.as_str() == STATUS_DISCONNECTED {
203 if let Err(e) = set_status(STATUS_REGISTERING) {
204 warn!(event = "status.transition.error", error = %e);
205 } else {
206 info!(
207 event = "status.transition",
208 from = STATUS_DISCONNECTED,
209 to = STATUS_REGISTERING,
210 "moved to registering"
211 );
212 }
213 }
214
215 attempt += 1;
216
217 let start = Instant::now();
218 let res = register_once(
219 &dds_base_url,
220 &node_url,
221 &node_version,
222 ®_secret,
223 &sk,
224 &client,
225 &capabilities,
226 )
227 .await;
228 let elapsed_ms = start.elapsed().as_millis();
229
230 match res {
231 Ok(()) => {
232 let _ = set_status(STATUS_REGISTERED);
233 let _ = touch_healthcheck_now();
234 info!(
235 event = "registration.success",
236 elapsed_ms = elapsed_ms as i64,
237 "successfully registered to DDS"
238 );
239 attempt = 0;
240 next_sleep = healthcheck_ttl;
241 drop(lock_guard);
242 }
243 Err(e) => {
244 warn!(
245 event = "registration.error",
246 elapsed_ms = elapsed_ms as i64,
247 error = %e,
248 error_debug = ?e,
249 attempt = attempt,
250 "registration to DDS failed; will back off"
251 );
252 if max_retry >= 0 && attempt >= max_retry {
253 warn!(
254 event = "registration.max_retry_reached",
255 max_retry = max_retry,
256 "max retry reached; pausing until next TTL window"
257 );
258 attempt = 0;
259 next_sleep = healthcheck_ttl;
260 drop(lock_guard);
261 continue;
262 }
263 let base = Duration::from_secs(timer_interval_secs(attempt));
264 let jitter_factor: f64 = rand::thread_rng().gen_range(0.8..=1.2);
265 next_sleep =
266 Duration::from_secs_f64(base.as_secs_f64() * jitter_factor.max(0.1));
267 drop(lock_guard);
268 }
269 }
270 }
271 STATUS_REGISTERED => {
272 let elapsed = last_healthcheck
273 .map(|t| Utc::now() - t)
274 .map(|d| d.to_std().unwrap_or_default())
275 .unwrap_or_else(|| Duration::from_secs(u64::MAX / 2));
276
277 if elapsed > healthcheck_ttl {
278 info!(
279 event = "healthcheck.expired",
280 elapsed_since_healthcheck_sec = elapsed.as_secs() as i64,
281 "healthcheck TTL exceeded; re-entering registering"
282 );
283 let _ = set_status(STATUS_REGISTERING);
284 next_sleep = Duration::from_secs(1);
285 } else {
286 next_sleep = healthcheck_ttl;
287 }
288 }
289 other => {
290 warn!(
291 event = "status.unknown",
292 status = other,
293 "unknown status; resetting to disconnected"
294 );
295 let _ = set_status(STATUS_DISCONNECTED);
296 next_sleep = Duration::from_secs(1);
297 }
298 }
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use crate::crypto::load_secp256k1_privhex;
306 use parking_lot::Mutex as PLMutex;
307 use std::io;
308 use std::sync::Arc;
309 use tracing::subscriber;
310 use tracing_subscriber::layer::SubscriberExt;
311
312 struct BufWriter(Arc<PLMutex<Vec<u8>>>);
313 impl io::Write for BufWriter {
314 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
315 self.0.lock().extend_from_slice(buf);
316 Ok(buf.len())
317 }
318 fn flush(&mut self) -> io::Result<()> {
319 Ok(())
320 }
321 }
322 struct MakeBufWriter(Arc<PLMutex<Vec<u8>>>);
323 impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for MakeBufWriter {
324 type Writer = BufWriter;
325 fn make_writer(&'a self) -> Self::Writer {
326 BufWriter(self.0.clone())
327 }
328 }
329
330 #[tokio::test]
331 async fn logs_do_not_include_secret() {
332 let buf = Arc::new(PLMutex::new(Vec::<u8>::new()));
333 let make = MakeBufWriter(buf.clone());
334 let layer = tracing_subscriber::fmt::layer()
335 .with_writer(make)
336 .with_ansi(false)
337 .without_time();
338 let subscriber = tracing_subscriber::registry().with(layer);
339 let _guard = subscriber::set_default(subscriber);
340
341 let secret = "my-super-secret";
342 let dds = "http://127.0.0.1:9";
343 let url = "https://node.example.com";
344 let version = "1.2.3";
345 let sk = load_secp256k1_privhex(
346 "e331b6d69882b4ed5bb7f55b585d7d0f7dc3aeca4a3deee8d16bde3eca51aace",
347 )
348 .unwrap();
349 let client = reqwest::Client::builder()
350 .no_proxy()
351 .timeout(Duration::from_millis(200))
352 .build()
353 .unwrap();
354 let capabilities = vec![
355 "/reconstruction/global-refinement/v1".to_string(),
356 "/reconstruction/local-refinement/v1".to_string(),
357 ];
358
359 let _ = register_once(dds, url, version, secret, &sk, &client, &capabilities).await;
360
361 let captured = String::from_utf8(buf.lock().clone()).unwrap_or_default();
362 assert!(captured.contains("Registering node with DDS"));
363 assert!(
364 !captured.contains(secret),
365 "logs leaked secret: {}",
366 captured
367 );
368 }
369}