posemesh_compute_node/auth/
siwe_after_registration.rs1use super::siwe;
2use super::token_manager::{
3 AccessAuthenticator, SystemClock, TokenManager, TokenManagerConfig, TokenProvider,
4 TokenProviderError,
5};
6use crate::config::NodeConfig;
7use crate::dds::persist as dds_state;
8use anyhow::{anyhow, Result};
9use async_trait::async_trait;
10use posemesh_node_registration::state::read_state;
11use sha3::{Digest, Keccak256};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15use tokio::time::sleep;
16use tracing::{info, warn};
17
18type ManagerCell = Arc<Mutex<Option<Arc<SiweTokenManager>>>>;
19type SiweTokenManager = TokenManager<DdsAuthenticator, SystemClock>;
20
21#[derive(Clone)]
22struct DdsAuthenticator {
23 base_url: Arc<String>,
24 priv_hex: Arc<String>,
25 address: Arc<String>,
26}
27
28impl DdsAuthenticator {
29 fn new(base_url: String, priv_hex: String) -> Result<Self> {
30 let address = derive_eth_address(&priv_hex)?;
31 Ok(Self {
32 base_url: Arc::new(base_url),
33 priv_hex: Arc::new(priv_hex),
34 address: Arc::new(address),
35 })
36 }
37}
38
39#[async_trait]
40impl AccessAuthenticator for DdsAuthenticator {
41 async fn login(&self) -> Result<super::siwe::AccessBundle, super::siwe::SiweError> {
42 let meta = siwe::request_nonce(self.base_url.as_str(), self.address.as_str()).await?;
43 let message = siwe::compose_message(&meta, self.address.as_str(), None)?;
44 let signature = siwe::sign_message(self.priv_hex.as_str(), &message)?;
45 siwe::verify(
46 self.base_url.as_str(),
47 self.address.as_str(),
48 &message,
49 &signature,
50 )
51 .await
52 }
53}
54
55pub struct SiweAfterRegistration {
56 authenticator: Arc<DdsAuthenticator>,
57 config: TokenManagerConfig,
58 manager: ManagerCell,
59}
60
61impl SiweAfterRegistration {
62 pub fn from_config(cfg: &NodeConfig) -> Result<Self> {
63 let dds_base_url = cfg
64 .dds_base_url
65 .as_ref()
66 .ok_or_else(|| anyhow!("DDS_BASE_URL required for DDS SIWE authentication"))?
67 .as_str()
68 .to_string();
69
70 let priv_hex = cfg
71 .secp256k1_privhex
72 .as_ref()
73 .filter(|value| !value.trim().is_empty())
74 .ok_or_else(|| anyhow!("SECP256K1_PRIVHEX required for DDS SIWE authentication"))?
75 .to_string();
76
77 let config = TokenManagerConfig {
78 safety_ratio: cfg.token_safety_ratio as f64,
79 max_retries: cfg.token_reauth_max_retries,
80 jitter: Duration::from_millis(cfg.token_reauth_jitter_ms),
81 };
82
83 Self::new(dds_base_url, priv_hex, config)
84 }
85
86 pub fn new(dds_base_url: String, priv_hex: String, config: TokenManagerConfig) -> Result<Self> {
87 let authenticator = Arc::new(DdsAuthenticator::new(dds_base_url, priv_hex)?);
88 Ok(Self {
89 authenticator,
90 config,
91 manager: Arc::new(Mutex::new(None)),
92 })
93 }
94
95 pub async fn start(&self) -> Result<SiweHandle> {
96 let manager = self.ensure_started().await?;
97 Ok(SiweHandle { manager })
98 }
99
100 async fn ensure_started(&self) -> Result<Arc<SiweTokenManager>> {
101 {
102 let guard = self.manager.lock().await;
103 if let Some(existing) = guard.as_ref() {
104 return Ok(existing.clone());
105 }
106 }
107
108 self.wait_for_registration().await?;
109
110 let manager: Arc<SiweTokenManager> = Arc::new(TokenManager::new(
111 Arc::clone(&self.authenticator),
112 Arc::new(SystemClock),
113 self.config.clone(),
114 ));
115 manager.start_bg().await;
116
117 manager
118 .bearer()
119 .await
120 .map_err(|err| anyhow!("initial DDS SIWE login failed: {err}"))?;
121
122 let mut guard = self.manager.lock().await;
123 if let Some(existing) = guard.as_ref() {
124 manager.stop_bg().await;
125 return Ok(existing.clone());
126 }
127 *guard = Some(manager.clone());
128 Ok(manager)
129 }
130
131 async fn wait_for_registration(&self) -> Result<()> {
132 loop {
133 match read_state() {
135 Ok(state)
136 if state.status == posemesh_node_registration::state::STATUS_REGISTERED =>
137 {
138 info!("DDS registration confirmed (status=registered); starting SIWE token manager");
139 return Ok(());
140 }
141 Ok(_) => {
142 }
144 Err(err) => {
145 warn!(error = %err, "Failed to read DDS registration state; retrying");
146 }
147 }
148
149 match dds_state::read_node_secret() {
150 Ok(Some(_)) => {
151 info!(
152 "DDS registration confirmed (secret present); starting SIWE token manager"
153 );
154 return Ok(());
155 }
156 Ok(None) => {}
157 Err(err) => {
158 warn!(error = %err, "Failed to read DDS registration secret; retrying");
159 }
160 }
161
162 sleep(Duration::from_secs(1)).await;
163 }
164 }
165}
166
167#[derive(Clone)]
168pub struct SiweHandle {
169 manager: Arc<SiweTokenManager>,
170}
171
172impl SiweHandle {
173 pub async fn bearer(&self) -> Result<String, TokenProviderError> {
174 self.manager.bearer().await
175 }
176
177 pub async fn shutdown(&self) {
178 self.manager.stop_bg().await;
179 }
180}
181
182#[async_trait]
183impl TokenProvider for SiweHandle {
184 async fn bearer(&self) -> crate::auth::token_manager::TokenProviderResult<String> {
185 self.manager.bearer().await
187 }
188
189 async fn on_unauthorized(&self) {
190 self.manager.on_unauthorized_retry().await;
192 }
193}
194
195fn derive_eth_address(priv_hex: &str) -> Result<String> {
196 use k256::{ecdsa::SigningKey, FieldBytes};
197
198 let trimmed = priv_hex.trim_start_matches("0x");
199 let key_bytes =
200 hex::decode(trimmed).map_err(|_| anyhow!("invalid secp256k1 private key hex"))?;
201 if key_bytes.len() != 32 {
202 return Err(anyhow!("secp256k1 private key must be 32 bytes"));
203 }
204 let mut key = [0u8; 32];
205 key.copy_from_slice(&key_bytes);
206 let field_bytes: FieldBytes = key.into();
207 let signing_key = SigningKey::from_bytes(&field_bytes)
208 .map_err(|e| anyhow!("failed to construct signing key: {e}"))?;
209 let verifying_key = signing_key.verifying_key();
210 let encoded = verifying_key.to_encoded_point(false);
211 let pubkey = encoded.as_bytes();
212
213 let mut hasher = Keccak256::new();
214 hasher.update(&pubkey[1..]);
215 let hashed = hasher.finalize();
216 let address_bytes = &hashed[12..];
217 Ok(format!("0x{}", hex::encode(address_bytes)))
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use url::Url;
224
225 fn base_cfg() -> NodeConfig {
226 NodeConfig {
227 dms_base_url: Url::parse("https://dms.example").unwrap(),
228 node_version: "1.0.0".into(),
229 request_timeout_secs: 10,
230 dds_base_url: None,
231 node_url: None,
232 reg_secret: None,
233 secp256k1_privhex: None,
234 heartbeat_jitter_ms: 250,
235 heartbeat_min_ratio: 0.25,
236 heartbeat_max_ratio: 0.35,
237 poll_backoff_ms_min: 1000,
238 poll_backoff_ms_max: 30000,
239 token_safety_ratio: 0.75,
240 token_reauth_max_retries: 3,
241 token_reauth_jitter_ms: 500,
242 register_interval_secs: None,
243 register_max_retry: None,
244 max_concurrency: 1,
245 log_format: crate::config::LogFormat::Json,
246 enable_noop: true,
247 noop_sleep_secs: 1,
248 }
249 }
250
251 #[test]
252 fn from_config_errors_when_missing_siwe_fields() {
253 let cfg = base_cfg();
254 assert!(SiweAfterRegistration::from_config(&cfg).is_err());
255 }
256
257 #[test]
258 fn derive_eth_address_matches_expected_value() {
259 let priv_hex = "4c0883a69102937d6231471b5dbb6204fe5129617082798ce3f4fdf2548b6f90";
260 let addr = derive_eth_address(priv_hex).expect("address");
261 assert_eq!(addr, "0xfdbb6caf01414300c16ea14859fec7736d95355f");
262 }
263
264 #[test]
265 fn from_config_produces_instance_when_siwe_configured() {
266 let mut cfg = base_cfg();
267 cfg.dds_base_url = Some(Url::parse("https://dds.example").unwrap());
268 cfg.secp256k1_privhex =
269 Some("4c0883a69102937d6231471b5dbb6204fe5129617082798ce3f4fdf2548b6f90".into());
270
271 assert!(SiweAfterRegistration::from_config(&cfg).is_ok());
272 }
273}