posemesh_compute_node/auth/
siwe_after_registration.rs

1use super::siwe;
2use super::token_manager::{
3    AccessAuthenticator, SystemClock, TokenManager, TokenManagerConfig, TokenProvider,
4    TokenProviderError,
5};
6use crate::config::NodeConfig;
7use crate::dds::persist as dds_state;
8use anyhow::{anyhow, Result};
9use async_trait::async_trait;
10use posemesh_node_registration::state::read_state;
11use sha3::{Digest, Keccak256};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15use tokio::time::sleep;
16use tracing::{info, warn};
17
18type ManagerCell = Arc<Mutex<Option<Arc<SiweTokenManager>>>>;
19type SiweTokenManager = TokenManager<DdsAuthenticator, SystemClock>;
20
21#[derive(Clone)]
22struct DdsAuthenticator {
23    base_url: Arc<String>,
24    priv_hex: Arc<String>,
25    address: Arc<String>,
26}
27
28impl DdsAuthenticator {
29    fn new(base_url: String, priv_hex: String) -> Result<Self> {
30        let address = derive_eth_address(&priv_hex)?;
31        Ok(Self {
32            base_url: Arc::new(base_url),
33            priv_hex: Arc::new(priv_hex),
34            address: Arc::new(address),
35        })
36    }
37}
38
39#[async_trait]
40impl AccessAuthenticator for DdsAuthenticator {
41    async fn login(&self) -> Result<super::siwe::AccessBundle, super::siwe::SiweError> {
42        let meta = siwe::request_nonce(self.base_url.as_str(), self.address.as_str()).await?;
43        let message = siwe::compose_message(&meta, self.address.as_str(), None)?;
44        let signature = siwe::sign_message(self.priv_hex.as_str(), &message)?;
45        siwe::verify(
46            self.base_url.as_str(),
47            self.address.as_str(),
48            &message,
49            &signature,
50        )
51        .await
52    }
53}
54
55pub struct SiweAfterRegistration {
56    authenticator: Arc<DdsAuthenticator>,
57    config: TokenManagerConfig,
58    manager: ManagerCell,
59}
60
61impl SiweAfterRegistration {
62    pub fn from_config(cfg: &NodeConfig) -> Result<Self> {
63        let dds_base_url = cfg
64            .dds_base_url
65            .as_ref()
66            .ok_or_else(|| anyhow!("DDS_BASE_URL required for DDS SIWE authentication"))?
67            .as_str()
68            .to_string();
69
70        let priv_hex = cfg
71            .secp256k1_privhex
72            .as_ref()
73            .filter(|value| !value.trim().is_empty())
74            .ok_or_else(|| anyhow!("SECP256K1_PRIVHEX required for DDS SIWE authentication"))?
75            .to_string();
76
77        let config = TokenManagerConfig {
78            safety_ratio: cfg.token_safety_ratio as f64,
79            max_retries: cfg.token_reauth_max_retries,
80            jitter: Duration::from_millis(cfg.token_reauth_jitter_ms),
81        };
82
83        Self::new(dds_base_url, priv_hex, config)
84    }
85
86    pub fn new(dds_base_url: String, priv_hex: String, config: TokenManagerConfig) -> Result<Self> {
87        let authenticator = Arc::new(DdsAuthenticator::new(dds_base_url, priv_hex)?);
88        Ok(Self {
89            authenticator,
90            config,
91            manager: Arc::new(Mutex::new(None)),
92        })
93    }
94
95    pub async fn start(&self) -> Result<SiweHandle> {
96        let manager = self.ensure_started().await?;
97        Ok(SiweHandle { manager })
98    }
99
100    async fn ensure_started(&self) -> Result<Arc<SiweTokenManager>> {
101        {
102            let guard = self.manager.lock().await;
103            if let Some(existing) = guard.as_ref() {
104                return Ok(existing.clone());
105            }
106        }
107
108        self.wait_for_registration().await?;
109
110        let manager: Arc<SiweTokenManager> = Arc::new(TokenManager::new(
111            Arc::clone(&self.authenticator),
112            Arc::new(SystemClock),
113            self.config.clone(),
114        ));
115        manager.start_bg().await;
116
117        manager
118            .bearer()
119            .await
120            .map_err(|err| anyhow!("initial DDS SIWE login failed: {err}"))?;
121
122        let mut guard = self.manager.lock().await;
123        if let Some(existing) = guard.as_ref() {
124            manager.stop_bg().await;
125            return Ok(existing.clone());
126        }
127        *guard = Some(manager.clone());
128        Ok(manager)
129    }
130
131    async fn wait_for_registration(&self) -> Result<()> {
132        loop {
133            // Prefer the explicit registration state first.
134            match read_state() {
135                Ok(state)
136                    if state.status == posemesh_node_registration::state::STATUS_REGISTERED =>
137                {
138                    info!("DDS registration confirmed (status=registered); starting SIWE token manager");
139                    return Ok(());
140                }
141                Ok(_) => {
142                    // Fall through to secret check as a secondary signal (legacy flows).
143                }
144                Err(err) => {
145                    warn!(error = %err, "Failed to read DDS registration state; retrying");
146                }
147            }
148
149            match dds_state::read_node_secret() {
150                Ok(Some(_)) => {
151                    info!(
152                        "DDS registration confirmed (secret present); starting SIWE token manager"
153                    );
154                    return Ok(());
155                }
156                Ok(None) => {}
157                Err(err) => {
158                    warn!(error = %err, "Failed to read DDS registration secret; retrying");
159                }
160            }
161
162            sleep(Duration::from_secs(1)).await;
163        }
164    }
165}
166
167#[derive(Clone)]
168pub struct SiweHandle {
169    manager: Arc<SiweTokenManager>,
170}
171
172impl SiweHandle {
173    pub async fn bearer(&self) -> Result<String, TokenProviderError> {
174        self.manager.bearer().await
175    }
176
177    pub async fn shutdown(&self) {
178        self.manager.stop_bg().await;
179    }
180}
181
182#[async_trait]
183impl TokenProvider for SiweHandle {
184    async fn bearer(&self) -> crate::auth::token_manager::TokenProviderResult<String> {
185        // Delegate to internal manager
186        self.manager.bearer().await
187    }
188
189    async fn on_unauthorized(&self) {
190        // Force early refresh on next bearer() call
191        self.manager.on_unauthorized_retry().await;
192    }
193}
194
195fn derive_eth_address(priv_hex: &str) -> Result<String> {
196    use k256::{ecdsa::SigningKey, FieldBytes};
197
198    let trimmed = priv_hex.trim_start_matches("0x");
199    let key_bytes =
200        hex::decode(trimmed).map_err(|_| anyhow!("invalid secp256k1 private key hex"))?;
201    if key_bytes.len() != 32 {
202        return Err(anyhow!("secp256k1 private key must be 32 bytes"));
203    }
204    let mut key = [0u8; 32];
205    key.copy_from_slice(&key_bytes);
206    let field_bytes: FieldBytes = key.into();
207    let signing_key = SigningKey::from_bytes(&field_bytes)
208        .map_err(|e| anyhow!("failed to construct signing key: {e}"))?;
209    let verifying_key = signing_key.verifying_key();
210    let encoded = verifying_key.to_encoded_point(false);
211    let pubkey = encoded.as_bytes();
212
213    let mut hasher = Keccak256::new();
214    hasher.update(&pubkey[1..]);
215    let hashed = hasher.finalize();
216    let address_bytes = &hashed[12..];
217    Ok(format!("0x{}", hex::encode(address_bytes)))
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use url::Url;
224
225    fn base_cfg() -> NodeConfig {
226        NodeConfig {
227            dms_base_url: Url::parse("https://dms.example").unwrap(),
228            node_version: "1.0.0".into(),
229            request_timeout_secs: 10,
230            dds_base_url: None,
231            node_url: None,
232            reg_secret: None,
233            secp256k1_privhex: None,
234            heartbeat_jitter_ms: 250,
235            poll_backoff_ms_min: 1000,
236            poll_backoff_ms_max: 30000,
237            token_safety_ratio: 0.75,
238            token_reauth_max_retries: 3,
239            token_reauth_jitter_ms: 500,
240            register_interval_secs: None,
241            register_max_retry: None,
242            max_concurrency: 1,
243            log_format: crate::config::LogFormat::Json,
244            enable_noop: true,
245            noop_sleep_secs: 1,
246        }
247    }
248
249    #[test]
250    fn from_config_errors_when_missing_siwe_fields() {
251        let cfg = base_cfg();
252        assert!(SiweAfterRegistration::from_config(&cfg).is_err());
253    }
254
255    #[test]
256    fn derive_eth_address_matches_expected_value() {
257        let priv_hex = "4c0883a69102937d6231471b5dbb6204fe5129617082798ce3f4fdf2548b6f90";
258        let addr = derive_eth_address(priv_hex).expect("address");
259        assert_eq!(addr, "0xfdbb6caf01414300c16ea14859fec7736d95355f");
260    }
261
262    #[test]
263    fn from_config_produces_instance_when_siwe_configured() {
264        let mut cfg = base_cfg();
265        cfg.dds_base_url = Some(Url::parse("https://dds.example").unwrap());
266        cfg.secp256k1_privhex =
267            Some("4c0883a69102937d6231471b5dbb6204fe5129617082798ce3f4fdf2548b6f90".into());
268
269        assert!(SiweAfterRegistration::from_config(&cfg).is_ok());
270    }
271}