posemesh_compute_node/auth/
siwe_after_registration.rs1use super::siwe;
2use super::token_manager::{
3 AccessAuthenticator, SystemClock, TokenManager, TokenManagerConfig, TokenProvider,
4 TokenProviderError,
5};
6use crate::config::NodeConfig;
7use crate::dds::persist as dds_state;
8use anyhow::{anyhow, Result};
9use async_trait::async_trait;
10use posemesh_node_registration::state::{
11 read_state, set_status, STATUS_DISCONNECTED, STATUS_REGISTERED,
12};
13use reqwest::StatusCode;
14use sha3::{Digest, Keccak256};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::Mutex;
18use tokio::time::sleep;
19use tracing::{info, warn};
20
21type ManagerCell = Arc<Mutex<Option<Arc<SiweTokenManager>>>>;
22type SiweTokenManager = TokenManager<DdsAuthenticator, SystemClock>;
23
24#[derive(Clone)]
25struct DdsAuthenticator {
26 base_url: Arc<String>,
27 priv_hex: Arc<String>,
28 address: Arc<String>,
29}
30
31impl DdsAuthenticator {
32 fn new(base_url: String, priv_hex: String) -> Result<Self> {
33 let address = derive_eth_address(&priv_hex)?;
34 Ok(Self {
35 base_url: Arc::new(base_url),
36 priv_hex: Arc::new(priv_hex),
37 address: Arc::new(address),
38 })
39 }
40}
41
42#[async_trait]
43impl AccessAuthenticator for DdsAuthenticator {
44 async fn login(&self) -> Result<super::siwe::AccessBundle, super::siwe::SiweError> {
45 let meta = siwe::request_nonce(self.base_url.as_str(), self.address.as_str()).await?;
46 let message = siwe::compose_message(&meta, self.address.as_str(), None)?;
47 let signature = siwe::sign_message(self.priv_hex.as_str(), &message)?;
48 match siwe::verify(
49 self.base_url.as_str(),
50 self.address.as_str(),
51 &message,
52 &signature,
53 )
54 .await
55 {
56 Ok(bundle) => Ok(bundle),
57 Err(err) => {
58 rearm_registration_if_invalid(&err);
59 Err(err)
60 }
61 }
62 }
63}
64
65fn rearm_registration_if_invalid(err: &siwe::SiweError) {
66 let Some(status) = err.status_code() else {
67 return;
68 };
69 if status != StatusCode::FORBIDDEN && status != StatusCode::NOT_FOUND {
70 return;
71 }
72
73 if let Ok(state) = read_state() {
74 if state.status == STATUS_DISCONNECTED {
75 return;
76 }
77 }
78
79 match set_status(STATUS_DISCONNECTED) {
80 Ok(()) => warn!(
81 status = %status,
82 "DDS SIWE verification indicates registration is no longer valid; re-arming registration"
83 ),
84 Err(set_err) => warn!(
85 status = %status,
86 error = %set_err,
87 "failed to re-arm registration after DDS SIWE verification invalidation"
88 ),
89 }
90}
91
92pub struct SiweAfterRegistration {
93 authenticator: Arc<DdsAuthenticator>,
94 config: TokenManagerConfig,
95 manager: ManagerCell,
96}
97
98impl SiweAfterRegistration {
99 pub fn from_config(cfg: &NodeConfig) -> Result<Self> {
100 let dds_base_url = cfg
101 .dds_base_url
102 .as_ref()
103 .ok_or_else(|| anyhow!("DDS_BASE_URL required for DDS SIWE authentication"))?
104 .as_str()
105 .to_string();
106
107 let priv_hex = cfg
108 .secp256k1_privhex
109 .as_ref()
110 .filter(|value| !value.trim().is_empty())
111 .ok_or_else(|| anyhow!("SECP256K1_PRIVHEX required for DDS SIWE authentication"))?
112 .to_string();
113
114 let config = TokenManagerConfig {
115 safety_ratio: cfg.token_safety_ratio as f64,
116 max_retries: cfg.token_reauth_max_retries,
117 jitter: Duration::from_millis(cfg.token_reauth_jitter_ms),
118 };
119
120 Self::new(dds_base_url, priv_hex, config)
121 }
122
123 pub fn new(dds_base_url: String, priv_hex: String, config: TokenManagerConfig) -> Result<Self> {
124 let authenticator = Arc::new(DdsAuthenticator::new(dds_base_url, priv_hex)?);
125 Ok(Self {
126 authenticator,
127 config,
128 manager: Arc::new(Mutex::new(None)),
129 })
130 }
131
132 pub async fn start(&self) -> Result<SiweHandle> {
133 let manager = self.ensure_started().await?;
134 Ok(SiweHandle { manager })
135 }
136
137 async fn ensure_started(&self) -> Result<Arc<SiweTokenManager>> {
138 {
139 let guard = self.manager.lock().await;
140 if let Some(existing) = guard.as_ref() {
141 return Ok(existing.clone());
142 }
143 }
144
145 self.wait_for_registration().await?;
146
147 let manager: Arc<SiweTokenManager> = Arc::new(TokenManager::new(
148 Arc::clone(&self.authenticator),
149 Arc::new(SystemClock),
150 self.config.clone(),
151 ));
152 manager.start_bg().await;
153
154 manager
155 .bearer()
156 .await
157 .map_err(|err| anyhow!("initial DDS SIWE login failed: {err}"))?;
158
159 let mut guard = self.manager.lock().await;
160 if let Some(existing) = guard.as_ref() {
161 manager.stop_bg().await;
162 return Ok(existing.clone());
163 }
164 *guard = Some(manager.clone());
165 Ok(manager)
166 }
167
168 async fn wait_for_registration(&self) -> Result<()> {
169 loop {
170 match read_state() {
172 Ok(state) if state.status == STATUS_REGISTERED => {
173 info!(
174 "DDS registration confirmed (status=registered); starting SIWE token manager"
175 );
176 return Ok(());
177 }
178 Ok(_) => {
179 }
181 Err(err) => {
182 warn!(error = %err, "Failed to read DDS registration state; retrying");
183 }
184 }
185
186 match dds_state::read_node_secret() {
187 Ok(Some(_)) => {
188 info!(
189 "DDS registration confirmed (secret present); starting SIWE token manager"
190 );
191 return Ok(());
192 }
193 Ok(None) => {}
194 Err(err) => {
195 warn!(error = %err, "Failed to read DDS registration secret; retrying");
196 }
197 }
198
199 sleep(Duration::from_secs(1)).await;
200 }
201 }
202}
203
204#[derive(Clone)]
205pub struct SiweHandle {
206 manager: Arc<SiweTokenManager>,
207}
208
209impl SiweHandle {
210 pub async fn bearer(&self) -> Result<String, TokenProviderError> {
211 self.manager.bearer().await
212 }
213
214 pub async fn shutdown(&self) {
215 self.manager.stop_bg().await;
216 }
217}
218
219#[async_trait]
220impl TokenProvider for SiweHandle {
221 async fn bearer(&self) -> crate::auth::token_manager::TokenProviderResult<String> {
222 self.manager.bearer().await
224 }
225
226 async fn on_unauthorized(&self) {
227 self.manager.on_unauthorized_retry().await;
229 }
230}
231
232fn derive_eth_address(priv_hex: &str) -> Result<String> {
233 use k256::{ecdsa::SigningKey, FieldBytes};
234
235 let trimmed = priv_hex.trim_start_matches("0x");
236 let key_bytes =
237 hex::decode(trimmed).map_err(|_| anyhow!("invalid secp256k1 private key hex"))?;
238 if key_bytes.len() != 32 {
239 return Err(anyhow!("secp256k1 private key must be 32 bytes"));
240 }
241 let mut key = [0u8; 32];
242 key.copy_from_slice(&key_bytes);
243 let field_bytes: FieldBytes = key.into();
244 let signing_key = SigningKey::from_bytes(&field_bytes)
245 .map_err(|e| anyhow!("failed to construct signing key: {e}"))?;
246 let verifying_key = signing_key.verifying_key();
247 let encoded = verifying_key.to_encoded_point(false);
248 let pubkey = encoded.as_bytes();
249
250 let mut hasher = Keccak256::new();
251 hasher.update(&pubkey[1..]);
252 let hashed = hasher.finalize();
253 let address_bytes = &hashed[12..];
254 Ok(format!("0x{}", hex::encode(address_bytes)))
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use posemesh_node_registration::state::{write_state, RegistrationState};
261 use reqwest::StatusCode;
262 use std::sync::{Mutex, OnceLock};
263 use url::Url;
264
265 fn test_lock() -> &'static Mutex<()> {
266 static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
267 TEST_LOCK.get_or_init(|| Mutex::new(()))
268 }
269
270 fn base_cfg() -> NodeConfig {
271 NodeConfig {
272 dms_base_url: Url::parse("https://dms.example").unwrap(),
273 node_version: "1.0.0".into(),
274 request_timeout_secs: 10,
275 dds_base_url: None,
276 reg_secret: None,
277 secp256k1_privhex: None,
278 heartbeat_jitter_ms: 250,
279 heartbeat_min_ratio: 0.25,
280 heartbeat_max_ratio: 0.35,
281 poll_backoff_ms_min: 1000,
282 poll_backoff_ms_max: 30000,
283 token_safety_ratio: 0.75,
284 token_reauth_max_retries: 3,
285 token_reauth_jitter_ms: 500,
286 register_interval_secs: None,
287 register_max_retry: None,
288 max_concurrency: 1,
289 log_format: crate::config::LogFormat::Json,
290 enable_noop: true,
291 noop_sleep_secs: 1,
292 }
293 }
294
295 #[test]
296 fn from_config_errors_when_missing_siwe_fields() {
297 let cfg = base_cfg();
298 assert!(SiweAfterRegistration::from_config(&cfg).is_err());
299 }
300
301 #[test]
302 fn derive_eth_address_matches_expected_value() {
303 let priv_hex = "4c0883a69102937d6231471b5dbb6204fe5129617082798ce3f4fdf2548b6f90";
304 let addr = derive_eth_address(priv_hex).expect("address");
305 assert_eq!(addr, "0xfdbb6caf01414300c16ea14859fec7736d95355f");
306 }
307
308 #[test]
309 fn from_config_produces_instance_when_siwe_configured() {
310 let mut cfg = base_cfg();
311 cfg.dds_base_url = Some(Url::parse("https://dds.example").unwrap());
312 cfg.secp256k1_privhex =
313 Some("4c0883a69102937d6231471b5dbb6204fe5129617082798ce3f4fdf2548b6f90".into());
314
315 assert!(SiweAfterRegistration::from_config(&cfg).is_ok());
316 }
317
318 #[test]
319 fn forbidden_siwe_verify_rearms_registration() {
320 let _guard = test_lock().lock().unwrap();
321 write_state(&RegistrationState {
322 status: STATUS_REGISTERED.to_string(),
323 last_healthcheck: None,
324 })
325 .unwrap();
326
327 rearm_registration_if_invalid(&siwe::SiweError::UpstreamStatus(StatusCode::FORBIDDEN));
328
329 assert_eq!(read_state().unwrap().status, STATUS_DISCONNECTED);
330 }
331
332 #[test]
333 fn unrelated_siwe_error_keeps_registration_state() {
334 let _guard = test_lock().lock().unwrap();
335 write_state(&RegistrationState {
336 status: STATUS_REGISTERED.to_string(),
337 last_healthcheck: None,
338 })
339 .unwrap();
340
341 rearm_registration_if_invalid(&siwe::SiweError::UpstreamStatus(StatusCode::BAD_GATEWAY));
342
343 assert_eq!(read_state().unwrap().status, STATUS_REGISTERED);
344 }
345}