Skip to main content

posemesh_compute_node/auth/
siwe_after_registration.rs

1use super::siwe;
2use super::token_manager::{
3    AccessAuthenticator, SystemClock, TokenManager, TokenManagerConfig, TokenProvider,
4    TokenProviderError,
5};
6use crate::config::NodeConfig;
7use crate::dds::persist as dds_state;
8use anyhow::{anyhow, Result};
9use async_trait::async_trait;
10use posemesh_node_registration::state::{
11    read_state, set_status, STATUS_DISCONNECTED, STATUS_REGISTERED,
12};
13use reqwest::StatusCode;
14use sha3::{Digest, Keccak256};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::Mutex;
18use tokio::time::sleep;
19use tracing::{info, warn};
20
21type ManagerCell = Arc<Mutex<Option<Arc<SiweTokenManager>>>>;
22type SiweTokenManager = TokenManager<DdsAuthenticator, SystemClock>;
23
24#[derive(Clone)]
25struct DdsAuthenticator {
26    base_url: Arc<String>,
27    priv_hex: Arc<String>,
28    address: Arc<String>,
29}
30
31impl DdsAuthenticator {
32    fn new(base_url: String, priv_hex: String) -> Result<Self> {
33        let address = derive_eth_address(&priv_hex)?;
34        Ok(Self {
35            base_url: Arc::new(base_url),
36            priv_hex: Arc::new(priv_hex),
37            address: Arc::new(address),
38        })
39    }
40}
41
42#[async_trait]
43impl AccessAuthenticator for DdsAuthenticator {
44    async fn login(&self) -> Result<super::siwe::AccessBundle, super::siwe::SiweError> {
45        let meta = siwe::request_nonce(self.base_url.as_str(), self.address.as_str()).await?;
46        let message = siwe::compose_message(&meta, self.address.as_str(), None)?;
47        let signature = siwe::sign_message(self.priv_hex.as_str(), &message)?;
48        match siwe::verify(
49            self.base_url.as_str(),
50            self.address.as_str(),
51            &message,
52            &signature,
53        )
54        .await
55        {
56            Ok(bundle) => Ok(bundle),
57            Err(err) => {
58                rearm_registration_if_invalid(&err);
59                Err(err)
60            }
61        }
62    }
63}
64
65fn rearm_registration_if_invalid(err: &siwe::SiweError) {
66    let Some(status) = err.status_code() else {
67        return;
68    };
69    if status != StatusCode::FORBIDDEN && status != StatusCode::NOT_FOUND {
70        return;
71    }
72
73    if let Ok(state) = read_state() {
74        if state.status == STATUS_DISCONNECTED {
75            return;
76        }
77    }
78
79    match set_status(STATUS_DISCONNECTED) {
80        Ok(()) => warn!(
81            status = %status,
82            "DDS SIWE verification indicates registration is no longer valid; re-arming registration"
83        ),
84        Err(set_err) => warn!(
85            status = %status,
86            error = %set_err,
87            "failed to re-arm registration after DDS SIWE verification invalidation"
88        ),
89    }
90}
91
92pub struct SiweAfterRegistration {
93    authenticator: Arc<DdsAuthenticator>,
94    config: TokenManagerConfig,
95    manager: ManagerCell,
96}
97
98impl SiweAfterRegistration {
99    pub fn from_config(cfg: &NodeConfig) -> Result<Self> {
100        let dds_base_url = cfg
101            .dds_base_url
102            .as_ref()
103            .ok_or_else(|| anyhow!("DDS_BASE_URL required for DDS SIWE authentication"))?
104            .as_str()
105            .to_string();
106
107        let priv_hex = cfg
108            .secp256k1_privhex
109            .as_ref()
110            .filter(|value| !value.trim().is_empty())
111            .ok_or_else(|| anyhow!("SECP256K1_PRIVHEX required for DDS SIWE authentication"))?
112            .to_string();
113
114        let config = TokenManagerConfig {
115            safety_ratio: cfg.token_safety_ratio as f64,
116            max_retries: cfg.token_reauth_max_retries,
117            jitter: Duration::from_millis(cfg.token_reauth_jitter_ms),
118        };
119
120        Self::new(dds_base_url, priv_hex, config)
121    }
122
123    pub fn new(dds_base_url: String, priv_hex: String, config: TokenManagerConfig) -> Result<Self> {
124        let authenticator = Arc::new(DdsAuthenticator::new(dds_base_url, priv_hex)?);
125        Ok(Self {
126            authenticator,
127            config,
128            manager: Arc::new(Mutex::new(None)),
129        })
130    }
131
132    pub async fn start(&self) -> Result<SiweHandle> {
133        let manager = self.ensure_started().await?;
134        Ok(SiweHandle { manager })
135    }
136
137    async fn ensure_started(&self) -> Result<Arc<SiweTokenManager>> {
138        {
139            let guard = self.manager.lock().await;
140            if let Some(existing) = guard.as_ref() {
141                return Ok(existing.clone());
142            }
143        }
144
145        self.wait_for_registration().await?;
146
147        let manager: Arc<SiweTokenManager> = Arc::new(TokenManager::new(
148            Arc::clone(&self.authenticator),
149            Arc::new(SystemClock),
150            self.config.clone(),
151        ));
152        manager.start_bg().await;
153
154        manager
155            .bearer()
156            .await
157            .map_err(|err| anyhow!("initial DDS SIWE login failed: {err}"))?;
158
159        let mut guard = self.manager.lock().await;
160        if let Some(existing) = guard.as_ref() {
161            manager.stop_bg().await;
162            return Ok(existing.clone());
163        }
164        *guard = Some(manager.clone());
165        Ok(manager)
166    }
167
168    async fn wait_for_registration(&self) -> Result<()> {
169        loop {
170            // Prefer the explicit registration state first.
171            match read_state() {
172                Ok(state) if state.status == STATUS_REGISTERED => {
173                    info!(
174                        "DDS registration confirmed (status=registered); starting SIWE token manager"
175                    );
176                    return Ok(());
177                }
178                Ok(_) => {
179                    // Fall through to secret check as a secondary signal (legacy flows).
180                }
181                Err(err) => {
182                    warn!(error = %err, "Failed to read DDS registration state; retrying");
183                }
184            }
185
186            match dds_state::read_node_secret() {
187                Ok(Some(_)) => {
188                    info!(
189                        "DDS registration confirmed (secret present); starting SIWE token manager"
190                    );
191                    return Ok(());
192                }
193                Ok(None) => {}
194                Err(err) => {
195                    warn!(error = %err, "Failed to read DDS registration secret; retrying");
196                }
197            }
198
199            sleep(Duration::from_secs(1)).await;
200        }
201    }
202}
203
204#[derive(Clone)]
205pub struct SiweHandle {
206    manager: Arc<SiweTokenManager>,
207}
208
209impl SiweHandle {
210    pub async fn bearer(&self) -> Result<String, TokenProviderError> {
211        self.manager.bearer().await
212    }
213
214    pub async fn shutdown(&self) {
215        self.manager.stop_bg().await;
216    }
217}
218
219#[async_trait]
220impl TokenProvider for SiweHandle {
221    async fn bearer(&self) -> crate::auth::token_manager::TokenProviderResult<String> {
222        // Delegate to internal manager
223        self.manager.bearer().await
224    }
225
226    async fn on_unauthorized(&self) {
227        // Force early refresh on next bearer() call
228        self.manager.on_unauthorized_retry().await;
229    }
230}
231
232fn derive_eth_address(priv_hex: &str) -> Result<String> {
233    use k256::{ecdsa::SigningKey, FieldBytes};
234
235    let trimmed = priv_hex.trim_start_matches("0x");
236    let key_bytes =
237        hex::decode(trimmed).map_err(|_| anyhow!("invalid secp256k1 private key hex"))?;
238    if key_bytes.len() != 32 {
239        return Err(anyhow!("secp256k1 private key must be 32 bytes"));
240    }
241    let mut key = [0u8; 32];
242    key.copy_from_slice(&key_bytes);
243    let field_bytes: FieldBytes = key.into();
244    let signing_key = SigningKey::from_bytes(&field_bytes)
245        .map_err(|e| anyhow!("failed to construct signing key: {e}"))?;
246    let verifying_key = signing_key.verifying_key();
247    let encoded = verifying_key.to_encoded_point(false);
248    let pubkey = encoded.as_bytes();
249
250    let mut hasher = Keccak256::new();
251    hasher.update(&pubkey[1..]);
252    let hashed = hasher.finalize();
253    let address_bytes = &hashed[12..];
254    Ok(format!("0x{}", hex::encode(address_bytes)))
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use posemesh_node_registration::state::{write_state, RegistrationState};
261    use reqwest::StatusCode;
262    use std::sync::{Mutex, OnceLock};
263    use url::Url;
264
265    fn test_lock() -> &'static Mutex<()> {
266        static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
267        TEST_LOCK.get_or_init(|| Mutex::new(()))
268    }
269
270    fn base_cfg() -> NodeConfig {
271        NodeConfig {
272            dms_base_url: Url::parse("https://dms.example").unwrap(),
273            node_version: "1.0.0".into(),
274            request_timeout_secs: 10,
275            dds_base_url: None,
276            reg_secret: None,
277            secp256k1_privhex: None,
278            heartbeat_jitter_ms: 250,
279            heartbeat_min_ratio: 0.25,
280            heartbeat_max_ratio: 0.35,
281            poll_backoff_ms_min: 1000,
282            poll_backoff_ms_max: 30000,
283            token_safety_ratio: 0.75,
284            token_reauth_max_retries: 3,
285            token_reauth_jitter_ms: 500,
286            register_interval_secs: None,
287            register_max_retry: None,
288            max_concurrency: 1,
289            log_format: crate::config::LogFormat::Json,
290            enable_noop: true,
291            noop_sleep_secs: 1,
292        }
293    }
294
295    #[test]
296    fn from_config_errors_when_missing_siwe_fields() {
297        let cfg = base_cfg();
298        assert!(SiweAfterRegistration::from_config(&cfg).is_err());
299    }
300
301    #[test]
302    fn derive_eth_address_matches_expected_value() {
303        let priv_hex = "4c0883a69102937d6231471b5dbb6204fe5129617082798ce3f4fdf2548b6f90";
304        let addr = derive_eth_address(priv_hex).expect("address");
305        assert_eq!(addr, "0xfdbb6caf01414300c16ea14859fec7736d95355f");
306    }
307
308    #[test]
309    fn from_config_produces_instance_when_siwe_configured() {
310        let mut cfg = base_cfg();
311        cfg.dds_base_url = Some(Url::parse("https://dds.example").unwrap());
312        cfg.secp256k1_privhex =
313            Some("4c0883a69102937d6231471b5dbb6204fe5129617082798ce3f4fdf2548b6f90".into());
314
315        assert!(SiweAfterRegistration::from_config(&cfg).is_ok());
316    }
317
318    #[test]
319    fn forbidden_siwe_verify_rearms_registration() {
320        let _guard = test_lock().lock().unwrap();
321        write_state(&RegistrationState {
322            status: STATUS_REGISTERED.to_string(),
323            last_healthcheck: None,
324        })
325        .unwrap();
326
327        rearm_registration_if_invalid(&siwe::SiweError::UpstreamStatus(StatusCode::FORBIDDEN));
328
329        assert_eq!(read_state().unwrap().status, STATUS_DISCONNECTED);
330    }
331
332    #[test]
333    fn unrelated_siwe_error_keeps_registration_state() {
334        let _guard = test_lock().lock().unwrap();
335        write_state(&RegistrationState {
336            status: STATUS_REGISTERED.to_string(),
337            last_healthcheck: None,
338        })
339        .unwrap();
340
341        rearm_registration_if_invalid(&siwe::SiweError::UpstreamStatus(StatusCode::BAD_GATEWAY));
342
343        assert_eq!(read_state().unwrap().status, STATUS_REGISTERED);
344    }
345}